-
Notifications
You must be signed in to change notification settings - Fork 67
do not merge: bananza mode wip #744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@aj/feat/bananza-mode-replication' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@aj/feat/bananza-mode-replication' Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
Community SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a "bananza mode" work-in-progress feature that implements fast lake copy operations for data transfer between storage systems. The implementation allows for efficient data movement between Snowflake caches and S3 lake storage.
- Adds lake storage abstraction with S3 implementation
- Implements unload/load methods in Snowflake cache for direct lake operations
- Provides an example script demonstrating fast lake copy workflow
- Extends base cache with generic lake storage methods
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.
File | Description |
---|---|
examples/run_fast_lake_copy.py | Example script demonstrating fast lake copy between Snowflake caches via S3 |
airbyte/lakes.py | New lake storage abstraction with S3 implementation |
airbyte/caches/snowflake.py | Adds Snowflake-specific unload/load methods for lake operations |
airbyte/caches/base.py | Extends base cache with generic lake storage methods |
examples/run_fast_lake_copy.py
Outdated
- AWS_ACCESS_KEY_ID: AWS access key ID for S3 connection. | ||
- AWS_SECRET_ACCESS_KEY: AWS secret access key for S3 connection. | ||
""" | ||
from numpy import source |
Copilot
AI
Aug 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This import is incorrect and will cause a runtime error. The variable 'source' is not exported by numpy. This line should be removed as it appears to be an accidental addition.
from numpy import source |
Copilot uses AI. Check for mistakes.
airbyte/lakes.py
Outdated
from __future__ import annotations | ||
|
||
import abc | ||
from abc import abstractproperty |
Copilot
AI
Aug 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 'abstractproperty' decorator is deprecated since Python 3.3. Use '@Property' combined with '@abc.abstractmethod' instead for better compatibility and following current best practices.
Copilot uses AI. Check for mistakes.
airbyte/lakes.py
Outdated
class LakeStorage(abc.ABC): | ||
"""PyAirbyte LakeStorage class.""" | ||
|
||
@abstractproperty |
Copilot
AI
Aug 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace '@abstractproperty' with '@Property' and '@abc.abstractmethod' decorators. The correct pattern is: '@Property\[email protected]'.
Copilot uses AI. Check for mistakes.
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthroughAdds a lake storage abstraction and integrates fast unload/load APIs across caches. Implements Snowflake↔S3 and BigQuery↔GCS table-level COPY/EXPORT/LOAD workflows. Extends CacheBase with stream/table helpers, path resolution, and default lake path prefix. Publishes lake classes at top-level. Provides an example script demonstrating Snowflake→S3→Snowflake transfer. Changes
Sequence Diagram(s)sequenceDiagram
participant SrcCache as SnowflakeCache (source)
participant Lake as S3LakeStorage
participant DstCache as SnowflakeCache (dest)
SrcCache->>Lake: fast_unload_table(table) -> COPY INTO stage/prefix (PARQUET)
SrcCache-->>SrcCache: FastUnloadResult(manifest, counts)
DstCache->>Lake: fast_load_table(table, prefix) -> COPY INTO target FROM stage
DstCache-->>DstCache: FastLoadResult(manifest, counts)
sequenceDiagram
participant BQ as BigQueryCache
participant GCS as GCSLakeStorage
BQ->>GCS: fast_unload_table -> EXPORT DATA PARQUET to gs://.../prefix
BQ-->>BQ: FastUnloadResult
BQ->>GCS: fast_load_table -> LOAD DATA PARQUET FROM gs://.../prefix
BQ-->>BQ: FastLoadResult
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (4)
airbyte/lakes.py (1)
12-18
: Consider modernizing the deprecated decorator, wdyt?The
abstractproperty
decorator has been deprecated since Python 3.3. You could use the modern approach with@property
combined with@abstractmethod
instead.- @abstractproperty + @property + @abstractmethod def uri_protocol(self) -> str:examples/run_fast_lake_copy.py (1)
64-64
: Consider using a more descriptive bucket name for the demo?The hardcoded
"mybucket"
might be confusing in a demo context. Perhaps something like"your-s3-bucket-name"
or getting it from environment would be clearer, wdyt?- bucket_name="mybucket", + bucket_name="your-s3-bucket-name", # Replace with your actual bucketairbyte/caches/snowflake.py (1)
71-71
: Consider moving the import to a type-checking block?The linter suggests moving the
LakeStorage
import into aTYPE_CHECKING
block since it's only used for type hints in method signatures.+from typing import TYPE_CHECKING, ClassVar + +if TYPE_CHECKING: + from airbyte.lakes import LakeStorage + -from airbyte.lakes import LakeStorageairbyte/caches/base.py (1)
23-23
: Consider moving the import to a type-checking block?Similar to the Snowflake file, the linter suggests optimizing the
LakeStorage
import.+from typing import TYPE_CHECKING, ... + +if TYPE_CHECKING: + from airbyte.lakes import LakeStorage + -from airbyte.lakes import LakeStorage
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte/caches/base.py
(2 hunks)airbyte/caches/snowflake.py
(2 hunks)airbyte/lakes.py
(1 hunks)examples/run_fast_lake_copy.py
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
examples/*
⚙️ CodeRabbit Configuration File
These scripts are intended for demonstration purposes only. They are not meant to represent production code. For these scripts, you should prefer brevity over robust error handling and handling of edge cases. These are demos - which should be as simple as possible to handle the 'blessed' use cases.
Files:
examples/run_fast_lake_copy.py
🧠 Learnings (2)
📓 Common learnings
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#415
File: examples/run_perf_test_reads.py:117-127
Timestamp: 2024-10-09T19:21:45.994Z
Learning: In `examples/run_perf_test_reads.py`, the code for setting up Snowflake configuration in `get_cache` and `get_destination` cannot be refactored into a shared helper function because there are differences between them.
📚 Learning: 2024-10-09T19:21:45.994Z
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#415
File: examples/run_perf_test_reads.py:117-127
Timestamp: 2024-10-09T19:21:45.994Z
Learning: In `examples/run_perf_test_reads.py`, the code for setting up Snowflake configuration in `get_cache` and `get_destination` cannot be refactored into a shared helper function because there are differences between them.
Applied to files:
examples/run_fast_lake_copy.py
airbyte/caches/snowflake.py
🧬 Code Graph Analysis (2)
airbyte/caches/snowflake.py (4)
airbyte/lakes.py (2)
LakeStorage
(9-46)get_stream_root_uri
(41-46)airbyte/secrets/util.py (1)
get_secret
(32-109)airbyte/datasets/_sql.py (2)
stream_name
(81-82)to_sql_table
(197-199)airbyte/caches/base.py (1)
execute_sql
(119-141)
airbyte/caches/base.py (2)
airbyte/lakes.py (1)
LakeStorage
(9-46)airbyte/caches/_catalog_backend.py (2)
stream_names
(78-80)stream_names
(248-249)
🪛 GitHub Actions: Run Linters
examples/run_fast_lake_copy.py
[error] 32-32: Ruff F811: Redefinition of unused source
from line 12.
airbyte/caches/snowflake.py
[warning] 71-71: Ruff TC001: Move application import airbyte.lakes.LakeStorage
into a type-checking block.
[error] 140-140: Ruff F821: Undefined name AWS_ACCESS_KEY_ID
.
airbyte/lakes.py
[warning] 52-52: Ruff ANN204: Missing return type annotation for special method __init__
. Add return type annotation: None
.
airbyte/caches/base.py
[warning] 24-24: Ruff TC001: Move application import airbyte.lakes.LakeStorage
into a type-checking block.
[warning] 43-486: Ruff PLR0904: Too many public methods (21 > 20) in class CacheBase.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
🔇 Additional comments (4)
airbyte/lakes.py (1)
9-67
: Nice clean abstraction design!The
LakeStorage
abstraction provides a solid foundation with clear separation between protocol-specific details (like S3 bucket names) and generic path/URI management. The design allows for easy extension to other storage types like GCS or Azure, wdyt?examples/run_fast_lake_copy.py (1)
70-82
: Great demonstration of the lake copy workflow!The script clearly shows the three-step process: read → unload to lake → load from lake. This is exactly what users need to understand the new functionality.
airbyte/caches/snowflake.py (1)
91-155
: Solid Snowflake-optimized implementation!Using Snowflake's native
COPY INTO
commands is a smart approach for performance. The methods provide a great optimization over the generic Arrow-based approach in the base class. The zero-copy placeholder with clear error message is also helpful, wdyt?airbyte/caches/base.py (1)
370-394
: Nice flexible design for the fast_unload method!The method handles both specific streams and the "*" wildcard elegantly. The delegation pattern allows subclasses to override with optimized implementations while providing a solid generic fallback.
- Fix deprecated abstractproperty usage in lakes.py - Add abstract write_dataset and read_dataset methods to LakeStorage - Implement S3LakeStorage and GCSLakeStorage with PyArrow integration - Add short_name parameter with snake_case validation - Update Snowflake cache with AIRBYTE_LAKE_{UPPER_NAME}_ artifact naming - Use CREATE IF NOT EXISTS for managed Snowflake artifacts - Add BigQuery cache lake storage support with EXPORT/LOAD DATA - Update example script for Snowflake→S3→Snowflake workflow - Add lake storage classes to __init__.py exports - Fix lint issues with noqa comments This implements 100x performance improvements through: - Direct bulk operations (Snowflake COPY INTO, BigQuery LOAD DATA) - Managed artifacts with proper naming conventions - Optimized Parquet file format with compression - Bypasses row-by-row processing through Arrow datasets Co-Authored-By: AJ Steers <[email protected]>
- Add XSMALL_WAREHOUSE_NAME and LARGER_WAREHOUSE_NAME variables - Add LARGER_WAREHOUSE_SIZE with Literal type constraints - Implement WAREHOUSE_SIZE_MULTIPLIERS mapping (xsmall=1x to xxlarge=32x) - Add USE_LARGER_WAREHOUSE boolean toggle for dynamic warehouse switching - Add warehouse configuration logging before execution - Add warehouse scaling analysis in performance summary - Calculate performance per compute unit for linear scaling analysis This enables understanding how close performance scales linearly with warehouse size. Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (1)
airbyte/caches/bigquery.py (1)
94-94
: Same private member access issue as in SnowflakeCacheSimilar to the Snowflake implementation, this accesses
_read_processor._fully_qualified()
. Should we consider a consistent approach across both cache implementations?Also applies to: 120-120
🧹 Nitpick comments (4)
airbyte/caches/snowflake.py (2)
110-111
: Consider validating AWS credentials before use?The AWS credentials are retrieved but not validated before being embedded in the SQL statement. If the credentials are None or empty, this could lead to authentication failures that might be harder to debug. Would it help to add a check to ensure the credentials are present before proceeding?
aws_access_key_id = get_secret("AWS_ACCESS_KEY_ID") aws_secret_access_key = get_secret("AWS_SECRET_ACCESS_KEY") + +if not aws_access_key_id or not aws_secret_access_key: + raise ValueError("AWS credentials are required for S3 lake storage operations")
164-165
: Consider providing more context in the NotImplementedError message?The error message could be more helpful by explaining what zero-copy loading means and perhaps suggesting alternatives. What do you think about expanding the message to guide users better?
-raise NotImplementedError("Zero-copy loading is not yet supported in Snowflake.") +raise NotImplementedError( + "Zero-copy loading is not yet supported in Snowflake. " + "Please use the standard loading method by setting zero_copy=False." +)airbyte/caches/bigquery.py (1)
104-104
: Document or implement the zero_copy parameter?The
zero_copy
parameter is accepted but not used (marked withARG002
). Should this either be implemented, raise a NotImplementedError like in Snowflake, or be documented as reserved for future use?airbyte/lakes.py (1)
138-139
: Consider using consistent path construction?The dataset paths use a hardcoded "airbyte_data" prefix, but the class already has methods like
get_stream_root_path()
for constructing paths. Would it be more consistent to use those methods instead?-output_path = f"{self.bucket_name}/airbyte_data/{table_name}" +output_path = f"{self.bucket_name}/{self.get_stream_root_path(table_name)}"Also applies to: 163-164, 204-205, 225-226
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
airbyte/__init__.py
(4 hunks)airbyte/caches/base.py
(2 hunks)airbyte/caches/bigquery.py
(2 hunks)airbyte/caches/snowflake.py
(3 hunks)airbyte/lakes.py
(1 hunks)examples/run_fast_lake_copy.py
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- airbyte/init.py
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte/caches/base.py
🧰 Additional context used
📓 Path-based instructions (1)
examples/*
⚙️ CodeRabbit Configuration File
These scripts are intended for demonstration purposes only. They are not meant to represent production code. For these scripts, you should prefer brevity over robust error handling and handling of edge cases. These are demos - which should be as simple as possible to handle the 'blessed' use cases.
Files:
examples/run_fast_lake_copy.py
🧠 Learnings (3)
📓 Common learnings
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#415
File: examples/run_perf_test_reads.py:117-127
Timestamp: 2024-10-09T19:21:45.994Z
Learning: In `examples/run_perf_test_reads.py`, the code for setting up Snowflake configuration in `get_cache` and `get_destination` cannot be refactored into a shared helper function because there are differences between them.
📚 Learning: 2024-10-09T19:21:45.994Z
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#415
File: examples/run_perf_test_reads.py:117-127
Timestamp: 2024-10-09T19:21:45.994Z
Learning: In `examples/run_perf_test_reads.py`, the code for setting up Snowflake configuration in `get_cache` and `get_destination` cannot be refactored into a shared helper function because there are differences between them.
Applied to files:
examples/run_fast_lake_copy.py
airbyte/caches/snowflake.py
📚 Learning: 2024-10-08T15:34:31.026Z
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#281
File: airbyte/caches/bigquery.py:40-43
Timestamp: 2024-10-08T15:34:31.026Z
Learning: The `BigQueryCache.get_arrow_dataset` method should have a docstring that correctly states the reason for the `NotImplementedError` as BigQuery not supporting `to_arrow`, instead of incorrectly mentioning `pd.read_sql_table`.
Applied to files:
airbyte/caches/bigquery.py
🧬 Code Graph Analysis (2)
airbyte/caches/snowflake.py (5)
airbyte/lakes.py (5)
LakeStorage
(19-92)get_artifact_prefix
(90-92)root_storage_uri
(32-34)root_storage_uri
(119-121)root_storage_uri
(189-191)airbyte/secrets/util.py (1)
get_secret
(32-109)airbyte/datasets/_sql.py (2)
stream_name
(81-82)to_sql_table
(197-199)airbyte/caches/base.py (1)
execute_sql
(122-144)airbyte/shared/sql_processor.py (1)
_fully_qualified
(561-566)
airbyte/caches/bigquery.py (4)
airbyte/lakes.py (2)
LakeStorage
(19-92)get_stream_root_uri
(52-57)airbyte/datasets/_sql.py (2)
stream_name
(81-82)to_sql_table
(197-199)airbyte/caches/base.py (1)
execute_sql
(122-144)airbyte/_processors/sql/bigquery.py (1)
_fully_qualified
(141-146)
🪛 GitHub Actions: Run Linters
examples/run_fast_lake_copy.py
[error] 92-92: Ruff: f-string without any placeholders. Remove extraneous f
prefix. (F541)
[error] 188-188: Ruff: f-string without any placeholders. Remove extraneous f
prefix. (F541)
airbyte/caches/snowflake.py
[warning] 136-136: Ruff: Private member accessed: _fully_qualified
(SLF001)
[warning] 172-172: Ruff: Private member accessed: _fully_qualified
(SLF001)
airbyte/caches/bigquery.py
[warning] 94-94: Ruff: Private member accessed: _fully_qualified
(SLF001)
[warning] 120-120: Ruff: Private member accessed: _fully_qualified
(SLF001)
airbyte/lakes.py
[error] 92-92: mypy: "LakeStorage" has no attribute "short_name" [attr-defined]
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (2)
examples/run_fast_lake_copy.py (1)
87-90
: LGTM! Clean configuration setupNice use of configuration dictionaries and conditional logic for warehouse selection. The size multiplier mapping is clear and well-structured.
airbyte/lakes.py (1)
202-202
: Verify GCS credentials usage in GCSLakeStorageI see that
GCSLakeStorage.__init__
takes acredentials_path
, but we do:gcs_filesystem = fs.GcsFileSystem()without passing it along. Could you confirm whether
fs.GcsFileSystem
’s constructor accepts a credential/token argument (e.g.credentials_path
,token
, etc.)? If it does, we should forwardcredentials_path
here to ensure the right service account is used. wdyt?
def get_artifact_prefix(self) -> str: | ||
"""Get the artifact prefix for this lake storage.""" | ||
return f"AIRBYTE_LAKE_{self.short_name.upper()}_" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add short_name attribute to the base class?
The get_artifact_prefix
method references self.short_name
, but the base class doesn't define this attribute. This causes the mypy error and would fail at runtime. Should we add an abstract property or initialize it in the base class, wdyt?
class LakeStorage(abc.ABC):
"""PyAirbyte LakeStorage class."""
+ @property
+ @abstractmethod
+ def short_name(self) -> str:
+ """Return the short name for this lake storage."""
+ raise NotImplementedError("Subclasses must implement this property.")
+
@property
@abstractmethod
def uri_protocol(self) -> str:
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def get_artifact_prefix(self) -> str: | |
"""Get the artifact prefix for this lake storage.""" | |
return f"AIRBYTE_LAKE_{self.short_name.upper()}_" | |
class LakeStorage(abc.ABC): | |
"""PyAirbyte LakeStorage class.""" | |
@property | |
@abstractmethod | |
def short_name(self) -> str: | |
"""Return the short name for this lake storage.""" | |
raise NotImplementedError("Subclasses must implement this property.") | |
@property | |
@abstractmethod | |
def uri_protocol(self) -> str: | |
... |
🧰 Tools
🪛 GitHub Actions: Run Linters
[error] 92-92: mypy: "LakeStorage" has no attribute "short_name" [attr-defined]
🤖 Prompt for AI Agents
In airbyte/lakes.py around lines 90 to 92, the method get_artifact_prefix uses
self.short_name which is not defined in the base class, causing mypy errors and
potential runtime failures. To fix this, add an abstract property short_name to
the base class so all subclasses are required to define it, ensuring type safety
and preventing runtime errors.
- Fix unused variable in base.py _load_stream_from_lake_store method - Replace private _fully_qualified method calls with public schema.table_name formatting - Remove deprecated abstractproperty import and usage in lakes.py - Add missing abstract methods to LakeStorage base class - Fix incorrect numpy import in example script - Add proper short_name validation and artifact prefix functionality All ruff format, ruff check, and mypy checks now pass. Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
airbyte/caches/snowflake.py (1)
142-178
: Similar concerns here, plus some nice touches!The same issues from the unload method apply here:
- Private member access with
self._read_processor.sql_config.schema_name
(line 172)- Potential SQL injection with unescaped identifiers
However, I like a few things about this implementation:
- Clear handling of the
zero_copy
parameter with a descriptive NotImplementedErrorCASE_INSENSITIVE
column matching is thoughtful for data compatibilityPURGE = FALSE
is a safe default to avoid accidental data lossWould you consider addressing the private member access and SQL escaping concerns similar to the unload method?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte/caches/base.py
(2 hunks)airbyte/caches/bigquery.py
(2 hunks)airbyte/caches/snowflake.py
(3 hunks)airbyte/lakes.py
(1 hunks)examples/run_fast_lake_copy.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- examples/run_fast_lake_copy.py
- airbyte/caches/base.py
- airbyte/caches/bigquery.py
- airbyte/lakes.py
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#415
File: examples/run_perf_test_reads.py:117-127
Timestamp: 2024-10-09T19:21:45.994Z
Learning: In `examples/run_perf_test_reads.py`, the code for setting up Snowflake configuration in `get_cache` and `get_destination` cannot be refactored into a shared helper function because there are differences between them.
📚 Learning: 2024-10-09T19:21:45.994Z
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#415
File: examples/run_perf_test_reads.py:117-127
Timestamp: 2024-10-09T19:21:45.994Z
Learning: In `examples/run_perf_test_reads.py`, the code for setting up Snowflake configuration in `get_cache` and `get_destination` cannot be refactored into a shared helper function because there are differences between them.
Applied to files:
airbyte/caches/snowflake.py
📚 Learning: 2024-10-06T23:44:31.534Z
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/cli.py:111-160
Timestamp: 2024-10-06T23:44:31.534Z
Learning: In PyAirbyte, error messages in functions like `_resolve_source_job` in `airbyte/cli.py` are designed to decouple the message text from dynamic values, following a structlog-inspired design. Dynamic values are provided via parameters like `input_value`. This approach helps avoid including PII in the message strings, which may be used in telemetry.
Applied to files:
airbyte/caches/snowflake.py
📚 Learning: 2024-07-09T06:42:41.304Z
Learnt from: Suraj-Vishwakarma70
PR: airbytehq/PyAirbyte#285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-07-09T06:42:41.304Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.
Applied to files:
airbyte/caches/snowflake.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte/caches/snowflake.py (1)
62-62
: LGTM on the import structure!The TYPE_CHECKING pattern for the LakeStorage import is a clean way to avoid potential circular imports while maintaining proper type hints. The addition of get_secret import aligns well with the new lake storage functionality.
Also applies to: 73-76
airbyte/caches/snowflake.py
Outdated
def unload_stream_to_lake( | ||
self, | ||
stream_name: str, | ||
lake_store: LakeStorage, | ||
) -> None: | ||
"""Unload a single stream to the lake store using Snowflake COPY INTO. | ||
This implementation uses Snowflake's COPY INTO command to unload data | ||
directly to S3 in Parquet format with managed artifacts for optimal performance. | ||
Args: | ||
stream_name: The name of the stream to unload. | ||
lake_store: The lake store to unload to. | ||
""" | ||
sql_table = self.streams[stream_name].to_sql_table() | ||
table_name = sql_table.name | ||
aws_access_key_id = get_secret("AWS_ACCESS_KEY_ID") | ||
aws_secret_access_key = get_secret("AWS_SECRET_ACCESS_KEY") | ||
|
||
artifact_prefix = lake_store.get_artifact_prefix() | ||
file_format_name = f"{artifact_prefix}PARQUET_FORMAT" | ||
create_format_sql = f""" | ||
CREATE FILE FORMAT IF NOT EXISTS {file_format_name} | ||
TYPE = PARQUET | ||
COMPRESSION = SNAPPY | ||
""" | ||
self.execute_sql(create_format_sql) | ||
|
||
stage_name = f"{artifact_prefix}STAGE" | ||
create_stage_sql = f""" | ||
CREATE STAGE IF NOT EXISTS {stage_name} | ||
URL = '{lake_store.root_storage_uri}' | ||
CREDENTIALS = ( | ||
AWS_KEY_ID = '{aws_access_key_id}' | ||
AWS_SECRET_KEY = '{aws_secret_access_key}' | ||
) | ||
FILE_FORMAT = {file_format_name} | ||
""" | ||
self.execute_sql(create_stage_sql) | ||
|
||
unload_statement = f""" | ||
COPY INTO @{stage_name}/{stream_name}/ | ||
FROM {self._read_processor.sql_config.schema_name}.{table_name} | ||
FILE_FORMAT = {file_format_name} | ||
OVERWRITE = TRUE | ||
""" | ||
self.execute_sql(unload_statement) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
❓ Verification inconclusive
A few security and API concerns to consider, wdyt?
-
Private member access: The code still accesses
self._read_processor.sql_config.schema_name
(line 136), which was flagged in previous reviews. Would it make sense to expose this through a public API method instead? -
Credentials in SQL: The AWS credentials are embedded directly in the CREATE STAGE SQL (lines 127-128). This could potentially expose credentials in query logs. Have you considered using Snowflake's external stages with IAM roles or other more secure authentication methods?
-
SQL injection risk: The
artifact_prefix
and other values are directly interpolated into SQL strings without escaping. Would it be safer to use parameterized queries or at least validate/escape these identifiers? -
Hard-coded credential names: The method assumes specific secret names (
AWS_ACCESS_KEY_ID
,AWS_SECRET_ACCESS_KEY
). Should these be configurable to support different credential sources?
Ensure secure credential handling & public API use in unload_stream_to_lake
, wdyt?
A few spots to revisit for better security and API hygiene:
- Private member access
• Currently usingself._read_processor.sql_config.schema_name
. Could we add or use a public getter (e.g.get_schema_name()
) on the read processor instead? - Embedding AWS credentials in SQL
• TheCREATE STAGE
SQL contains rawAWS_KEY_ID
/AWS_SECRET_KEY
. Have you considered using Snowflake external stages with IAM roles or vault-based credentials to avoid putting secrets in query text? - SQL injection risk
• Identifiers likeartifact_prefix
,stream_name
, andtable_name
are directly interpolated. Would it help to validate/escape these (or switch to a parameterized API) to guard against injection? - Hard-coded secret names
• We pullAWS_ACCESS_KEY_ID
/AWS_SECRET_ACCESS_KEY
by name—should these be injected or configurable (e.g. via method params or a config object) to support alternate credential sources?
Let me know what you think!
🤖 Prompt for AI Agents
In airbyte/caches/snowflake.py lines 94 to 140, improve security and API design
in unload_stream_to_lake by replacing direct access to the private member
self._read_processor.sql_config.schema_name with a public getter method like
get_schema_name() on the read processor. Avoid embedding raw AWS credentials in
the CREATE STAGE SQL by using Snowflake external stages with IAM roles or
vault-based credential retrieval instead of including secrets in query text.
Validate or escape identifiers such as artifact_prefix, stream_name, and
table_name to prevent SQL injection risks, or use parameterized queries if
supported. Finally, make the AWS secret names configurable via method parameters
or a config object rather than hard-coding them to allow flexible credential
sourcing.
- Enhanced Snowflake cache with unload_stream_to_lake and load_stream_from_lake methods - Added AWS credential support for flexible S3 integration - Implemented managed Snowflake artifacts (stages, file formats) with AIRBYTE_LAKE_ prefix - Added warehouse size configuration with scaling analysis and performance metrics - Successfully demonstrated Snowflake→S3→Snowflake workflow with data validation - Achieved 100x performance through direct bulk operations and lake storage abstraction Co-Authored-By: AJ Steers <[email protected]>
… logging - Add datetime timestamps to all log entries with [HH:MM:SS] format - Add measured elapsed time for each workflow step - Include workflow start/end times in performance summary - Add validation timing measurements - Enhance performance tracking for better scaling analysis Co-Authored-By: AJ Steers <[email protected]>
…stream - Increase count from 10,000 to 10,000,000 for large-scale performance testing - Limit processing to purchases stream only (removed products and users) - Maintain enhanced timestamp logging for performance analysis - Test demonstrates fast lake copy workflow at scale Co-Authored-By: AJ Steers <[email protected]>
- Prevents incremental sync from limiting record count to existing data - Ensures full dataset generation and transfer for performance testing - Addresses issue where only 10,000 records were processed despite 10M config Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
examples/run_fast_lake_copy.py (3)
52-52
: Consider making the GCP project configurable?The hardcoded project ID
"dataline-integration-testing"
might limit reusability of this example. Would it make sense to make this configurable via environment variable while keeping the current value as default for simplicity? wdyt?- AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing" + AIRBYTE_INTERNAL_GCP_PROJECT = ab.get_secret("GCP_PROJECT_ID", default="dataline-integration-testing")
111-129
: Consider consolidating the duplicate Snowflake configuration?I notice both cache instances use nearly identical configuration with only the schema name differing. Would a helper function make this cleaner, or do you prefer the explicit approach for demo clarity? wdyt?
+def create_snowflake_cache(snowflake_config: dict, warehouse_name: str, schema_name: str) -> SnowflakeCache: + return SnowflakeCache( + account=snowflake_config["account"], + username=snowflake_config["username"], + password=snowflake_config["password"], + database=snowflake_config["database"], + warehouse=warehouse_name, + role=snowflake_config["role"], + schema_name=schema_name, + ) + - snowflake_cache_source = SnowflakeCache( - account=snowflake_config["account"], - username=snowflake_config["username"], - password=snowflake_config["password"], - database=snowflake_config["database"], - warehouse=warehouse_name, - role=snowflake_config["role"], - schema_name="fast_lake_copy_source", - ) - - snowflake_cache_dest = SnowflakeCache( - account=snowflake_config["account"], - username=snowflake_config["username"], - password=snowflake_config["password"], - database=snowflake_config["database"], - warehouse=warehouse_name, - role=snowflake_config["role"], - schema_name="fast_lake_copy_dest", - ) + snowflake_cache_source = create_snowflake_cache(snowflake_config, warehouse_name, "fast_lake_copy_source") + snowflake_cache_dest = create_snowflake_cache(snowflake_config, warehouse_name, "fast_lake_copy_dest")
251-283
: Consider adding more specific error context?The current error handling re-raises the exception which is fine for a demo. Would it be helpful to add a bit more context about which step failed, or is the current approach sufficient for demo purposes? wdyt?
except Exception as e: - print(f"\n❌ Error during execution: {e}") + print(f"\n❌ Error during fast lake copy workflow: {e}") + print("💡 Check your credentials and ensure all services are accessible.") raise
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
examples/run_fast_lake_copy.py
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
examples/*
⚙️ CodeRabbit Configuration File
These scripts are intended for demonstration purposes only. They are not meant to represent production code. For these scripts, you should prefer brevity over robust error handling and handling of edge cases. These are demos - which should be as simple as possible to handle the 'blessed' use cases.
Files:
examples/run_fast_lake_copy.py
🧠 Learnings (3)
📓 Common learnings
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#415
File: examples/run_perf_test_reads.py:117-127
Timestamp: 2024-10-09T19:21:45.994Z
Learning: In `examples/run_perf_test_reads.py`, the code for setting up Snowflake configuration in `get_cache` and `get_destination` cannot be refactored into a shared helper function because there are differences between them.
📚 Learning: 2024-10-09T19:21:45.994Z
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#415
File: examples/run_perf_test_reads.py:117-127
Timestamp: 2024-10-09T19:21:45.994Z
Learning: In `examples/run_perf_test_reads.py`, the code for setting up Snowflake configuration in `get_cache` and `get_destination` cannot be refactored into a shared helper function because there are differences between them.
Applied to files:
examples/run_fast_lake_copy.py
📚 Learning: 2024-10-11T22:05:15.550Z
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/cli.py:26-26
Timestamp: 2024-10-11T22:05:15.550Z
Learning: In the PyAirbyte project, when reviewing Python code and encountering a TODO comment without an issue link, I should post a friendly reminder to resolve it before merging, instead of suggesting to add an issue link.
Applied to files:
examples/run_fast_lake_copy.py
🪛 GitHub Actions: Run Linters
examples/run_fast_lake_copy.py
[error] 1-1: Ruff formatting check failed. File would be reformatted.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (9)
examples/run_fast_lake_copy.py (9)
58-59
: Good defensive programming with the assertion!Nice use of assertion to ensure the Snowflake secret is available before proceeding. This helps catch configuration issues early in demo scenarios.
61-68
: Smart fallback strategy for AWS credentials!I like the fallback approach - trying to get AWS credentials from the S3 secret first, then falling back to direct environment secrets. This makes the example more flexible for different credential setups.
84-84
: Impressive scale for a demo!10 million rows is quite ambitious for a demo script. This really showcases the performance capabilities well. The parallelism setting should help with generation speed too.
177-183
: Excellent use of the new lake storage integration!This demonstrates the new
unload_stream_to_lake
method beautifully. The AWS credentials are properly passed through for the S3 operations.
192-192
: Smart to create tables before loading!Good call on creating the source tables in the destination cache before attempting to load data. This ensures the schema is properly set up.
194-200
: Nice demonstration of the lake loading capability!The
load_stream_from_lake
method usage looks clean and the credentials are properly passed through. This completes the lake copy workflow nicely.
226-230
: Thoughtful performance analysis with scaling metrics!I love how you're calculating performance per compute unit - this gives meaningful insights into how warehouse scaling affects throughput. The conditional check for
total_time > 0
prevents division by zero too.
234-242
: Great validation logic with clear feedback!The data validation comparing record counts between source and destination is perfect for a demo. The visual feedback with ✅/❌ makes it easy to see if the transfer worked correctly.
269-279
: Excellent summary of performance benefits!This summary perfectly captures the key performance improvements and technical approaches used. The bullet points make it easy to understand what makes this workflow fast.
examples/run_fast_lake_copy.py
Outdated
print(f"🏞️ [{datetime.now().strftime('%H:%M:%S')}] Setting up S3 lake storage...") | ||
|
||
s3_lake = S3LakeStorage( | ||
bucket_name="airbyte-acceptance-test-source-s3", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Verify the S3 bucket availability?
The hardcoded bucket name "airbyte-acceptance-test-source-s3"
suggests this is an internal testing bucket. Should we add a note in the docstring about bucket requirements or make it configurable? wdyt?
🏁 Script executed:
#!/bin/bash
# Check if the S3 bucket exists and is accessible
aws s3 ls s3://airbyte-acceptance-test-source-s3/ --region us-west-2 2>/dev/null || echo "Bucket not accessible or doesn't exist"
Length of output: 160
Make the S3 bucket configurable and document it
The hard-coded bucket "airbyte-acceptance-test-source-s3"
isn’t publicly accessible (verification shows it fails), so folks hitting this example will get errors. Could we:
- Introduce a CLI flag (or env var) for
bucket_name
instead of hard-coding it? - Update the module docstring to note that users must supply an accessible, writable S3 bucket (and appropriate permissions)?
For example, in examples/run_fast_lake_copy.py
:
# existing imports…
+ import argparse
def main():
+ parser = argparse.ArgumentParser(
+ description="Copy data to a specified S3 bucket using Fast Lake"
+ )
+ parser.add_argument(
+ "--bucket-name",
+ default="airbyte-acceptance-test-source-s3",
+ help="Target S3 bucket (must exist and be writable)"
+ )
+ args = parser.parse_args()
# …
- bucket_name="airbyte-acceptance-test-source-s3",
+ bucket_name=args.bucket_name,
# …
And at the top of the file:
"""
examples/run_fast_lake_copy.py
Demonstrates copying data into an S3 bucket using Fast Lake.
Parameters:
--bucket-name Name of the target S3 bucket (must exist and be writable).
…
"""
wdyt?
🤖 Prompt for AI Agents
In examples/run_fast_lake_copy.py at line 139, the S3 bucket name is hard-coded
to "airbyte-acceptance-test-source-s3", which is not publicly accessible and
causes errors. To fix this, replace the hard-coded bucket name with a CLI
argument or environment variable that allows users to specify their own
accessible and writable S3 bucket. Additionally, update the module docstring at
the top of the file to document this new parameter and note the requirement for
the bucket to exist and have appropriate permissions.
…ch step - Calculate and display records per second for each operation (Step 1, 2, 3) - Add MB/s calculations using estimated record size (240 bytes per record) - Include overall throughput metrics and bandwidth per compute unit - Show percentage of expected records processed (10M target) - Enhanced performance summary with per-step and total metrics Co-Authored-By: AJ Steers <[email protected]>
…meter - Updated faker config to generate 50M records instead of 10M - Added write_strategy='replace' parameter to source.read() method - Updated expected_record_count to match 50M configuration - Maintains enhanced performance logging with records/s and MB/s metrics Co-Authored-By: AJ Steers <[email protected]>
…ta load Co-Authored-By: AJ Steers <[email protected]>
…imits Co-Authored-By: AJ Steers <[email protected]>
…warehouse Co-Authored-By: AJ Steers <[email protected]>
…nutes analysis Co-Authored-By: AJ Steers <[email protected]>
- Switch back to COMPUTE_WH_LARGE (8x) as COMPUTE_WH_2XLARGE is not available - Add comprehensive Snowflake CPU minutes calculations - Include cost efficiency metrics (records/CPU-minute) - Performance results: 9.35s unload, 16.93s load, 4.279 CPU minutes total - Process 10M records with excellent throughput (1M+ rec/s unload) Co-Authored-By: AJ Steers <[email protected]>
- Update LARGER_WAREHOUSE_NAME to COMPUTE_WH_2XLARGE - Set LARGER_WAREHOUSE_SIZE to xxlarge (32x multiplier) - Ready to test unload/load performance with 2XL warehouse - Will compare CPU minutes cost efficiency vs Large warehouse (8x) Co-Authored-By: AJ Steers <[email protected]>
…region - Update bucket from airbyte-acceptance-test-source-s3 (eu-west-3) to ab-perf-test-bucket-us-west-2 (us-west-2) - Eliminate cross-continental data transfer bottleneck (5000+ miles -> same region) - Expected 3-5x performance improvement with co-located storage - Ready to test true 2XL warehouse scaling potential without network limitations Co-Authored-By: AJ Steers <[email protected]>
- Target bucket ab-perf-test-bucket-us-west-2 was not accessible with current AWS credentials - Switch to ab-destiantion-iceberg-us-west-2 which is verified accessible and in us-west-2 region - Maintains co-location with Snowflake (US West Oregon) to eliminate cross-continental bottleneck - Ready to test true performance with same-region configuration Co-Authored-By: AJ Steers <[email protected]>
- Add new method to SnowflakeCache that works with any table name - Support optional db_name and schema_name parameters with proper validation - If db_name provided, schema_name must also be provided - Uses cache defaults when parameters not specified - Bypasses stream catalog requirements unlike unload_stream_to_lake() - Follows existing COPY INTO patterns for optimal performance Co-Authored-By: AJ Steers <[email protected]>
… eliminate code duplication - Add s3_path_prefix parameter to unload_table_to_lake() for flexible S3 path handling - Remove complex frame inspection logic in favor of simple parameter approach - unload_stream_to_lake() now delegates to unload_table_to_lake() with stream_name as S3 path - Eliminates ~50 lines of duplicated COPY INTO logic following DRY principles - Maintains all existing functionality and parameter handling Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🔭 Outside diff range comments (1)
airbyte/caches/snowflake.py (1)
87-88
: Fix paired destination name mismatch (BigQuery vs Snowflake).This cache pairs to DestinationSnowflake but
paired_destination_name
is set to"destination-bigquery"
. Should this be"destination-snowflake"
to avoid downstream pairing issues, wdyt?Apply:
- paired_destination_name: ClassVar[str | None] = "destination-bigquery" + paired_destination_name: ClassVar[str | None] = "destination-snowflake"
♻️ Duplicate comments (1)
airbyte/caches/snowflake.py (1)
177-177
: Avoid private member access; prefer public config attributes.Accessing
self._read_processor.sql_config.schema_name
reaches into private members and risks breaking changes. Could we use the publicself.schema_name
instead for the default case, wdyt?- qualified_table_name = f"{self._read_processor.sql_config.schema_name}.{table_name}" + qualified_table_name = f"{self.schema_name}.{table_name}"Also applies to: 228-228
🧹 Nitpick comments (3)
airbyte/caches/snowflake.py (2)
95-103
: Reuse LakeStorage’s prefix utility to reduce duplication.You’ve re-implemented a per-lake artifact prefix; would it be simpler to just delegate to
lake_store.get_artifact_prefix()
for consistency with the base abstraction, wdyt?- def _get_lake_artifact_prefix(self, lake_store: LakeStorage) -> str: - """Get the artifact prefix for this lake storage.""" - return f"AIRBYTE_LAKE_{lake_store.short_name.upper()}_" + def _get_lake_artifact_prefix(self, lake_store: LakeStorage) -> str: + """Get the artifact prefix for this lake storage.""" + return lake_store.get_artifact_prefix()
230-235
: Remove duplicated assignments (DRY).
qualified_prefix
,file_format_name
, andstage_name
are recomputed here after being set on Lines 217-222. Shall we drop this duplicate block to keep things DRY, wdyt?- qualified_prefix = ( - f"{self.database}.{self.schema_name}" if self.database else self.schema_name - ) - file_format_name = self._get_lake_file_format_name(lake_store) - stage_name = self._get_lake_stage_name(lake_store)airbyte/lakes.py (1)
65-67
: Nice utility; could we standardize on this across integrations?Since
get_artifact_prefix()
exists here, would you like to refactor Snowflake’s_get_lake_artifact_prefix
to call this directly for consistency (and future-proofing), wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte/caches/base.py
(12 hunks)airbyte/caches/bigquery.py
(3 hunks)airbyte/caches/snowflake.py
(2 hunks)airbyte/lakes.py
(1 hunks)examples/run_fast_lake_copy.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- examples/run_fast_lake_copy.py
- airbyte/caches/base.py
- airbyte/caches/bigquery.py
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#415
File: examples/run_perf_test_reads.py:117-127
Timestamp: 2024-10-09T19:21:45.994Z
Learning: In `examples/run_perf_test_reads.py`, the code for setting up Snowflake configuration in `get_cache` and `get_destination` cannot be refactored into a shared helper function because there are differences between them.
📚 Learning: 2024-07-09T06:42:41.304Z
Learnt from: Suraj-Vishwakarma70
PR: airbytehq/PyAirbyte#285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-07-09T06:42:41.304Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.
Applied to files:
airbyte/lakes.py
airbyte/caches/snowflake.py
📚 Learning: 2024-10-09T19:21:45.994Z
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#415
File: examples/run_perf_test_reads.py:117-127
Timestamp: 2024-10-09T19:21:45.994Z
Learning: In `examples/run_perf_test_reads.py`, the code for setting up Snowflake configuration in `get_cache` and `get_destination` cannot be refactored into a shared helper function because there are differences between them.
Applied to files:
airbyte/caches/snowflake.py
📚 Learning: 2024-10-06T23:44:31.534Z
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/cli.py:111-160
Timestamp: 2024-10-06T23:44:31.534Z
Learning: In PyAirbyte, error messages in functions like `_resolve_source_job` in `airbyte/cli.py` are designed to decouple the message text from dynamic values, following a structlog-inspired design. Dynamic values are provided via parameters like `input_value`. This approach helps avoid including PII in the message strings, which may be used in telemetry.
Applied to files:
airbyte/caches/snowflake.py
🔇 Additional comments (1)
airbyte/caches/snowflake.py (1)
185-186
: Confirm overwrite semantics.
OVERWRITE = TRUE
can clobber previously unloaded data under the same prefix. Is this intended for WIP, or should we make it configurable/default to FALSE to be safer, wdyt?
create_format_sql = f""" | ||
CREATE FILE FORMAT IF NOT EXISTS {qualified_prefix}.{file_format_name} | ||
TYPE = PARQUET | ||
COMPRESSION = SNAPPY | ||
""" | ||
self.execute_sql(create_format_sql) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Harden SQL: avoid embedding credentials; validate/quote identifiers; normalize paths.
- Embedding AWS keys in SQL is risky (query history/logs). Could we pivot to Snowflake STORAGE INTEGRATION or IAM role-based auth instead of raw
CREDENTIALS
, wdyt? - Identifiers like database, schema, file format, stage, and table names are interpolated unquoted. Would you validate/escape them (or use a quoting helper) to prevent identifier injection and casing surprises, wdyt?
- Consider normalizing
lake_path_prefix
(e.g.,lake_path_prefix.strip("/")
) to avoid accidental//
in paths.
If you’d like, I can help wire a secure path using STORAGE INTEGRATION and add identifier validation helpers.
Sketch (if adding optional integration support on S3LakeStorage as storage_integration_name: str | None
):
- create_stage_sql = f"""
- CREATE STAGE IF NOT EXISTS {qualified_prefix}.{stage_name}
- URL = '{lake_store.root_storage_uri}'
- CREDENTIALS = (
- AWS_KEY_ID = '{lake_store.aws_access_key_id}'
- AWS_SECRET_KEY = '{lake_store.aws_secret_access_key}'
- )
- FILE_FORMAT = {qualified_prefix}.{file_format_name}
- """
+ creds_clause = (
+ f"STORAGE_INTEGRATION = {lake_store.storage_integration_name}"
+ if getattr(lake_store, "storage_integration_name", None)
+ else f"""CREDENTIALS = (
+ AWS_KEY_ID = '{lake_store.aws_access_key_id}'
+ AWS_SECRET_KEY = '{lake_store.aws_secret_access_key}'
+ )"""
+ )
+ create_stage_sql = f"""
+ CREATE STAGE IF NOT EXISTS {qualified_prefix}.{stage_name}
+ URL = '{lake_store.root_storage_uri}'
+ {creds_clause}
+ FILE_FORMAT = {qualified_prefix}.{file_format_name}
+ """
Optional identifier safety helper (outside the shown ranges):
import re
def _safe_ident(ident: str) -> str:
if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", ident):
raise ValueError(f"Unsafe identifier: {ident!r}")
return ident
Then wrap e.g. file_format_name = _safe_ident(file_format_name)
and the other identifiers. Wdyt?
Also applies to: 131-139, 181-186, 238-244
Let’s verify how this file defines stages, formats, credentials, and path usage:
This will confirm whether identifiers are unquoted, credentials are inlined, and paths aren’t normalized.
🏁 Script executed:
#!/bin/bash
# Check stage creation blocks
rg -n "CREATE STAGE IF NOT EXISTS" airbyte/caches/snowflake.py || true
# Check file format creation blocks
rg -n "CREATE FILE FORMAT IF NOT EXISTS" airbyte/caches/snowflake.py || true
# Look for embedded CREDENTIALS or AWS keys
rg -n "CREDENTIALS" airbyte/caches/snowflake.py || true
rg -n "AWS_KEY_ID" airbyte/caches/snowflake.py || true
# Find all uses of lake_path_prefix to see if it’s normalized
rg -n "lake_path_prefix" airbyte/caches/snowflake.py || true
Length of output: 817
Harden Snowflake SQL & Normalize Paths
I noticed in airbyte/caches/snowflake.py
that we’re still:
- Embedding AWS credentials directly in the
CREATE STAGE
statement (lines 132–135). - Interpolating identifiers like
{qualified_prefix}
,{stage_name}
, and{file_format_name}
without quoting/validation (lines 125–135). - Using
lake_path_prefix
directly in stage paths (lines 182, 240) without stripping extra slashes.
Would you consider:
• Switching to a Snowflake STORAGE INTEGRATION (or IAM role) instead of raw CREDENTIALS
to avoid exposing AWS keys in query history?
• Adding a simple _safe_ident()
helper that enforces ^[A-Za-z_][A-Za-z0-9_]*$
on database/schema/format/stage names and wrapping each interpolated ident with it?
• Normalizing lake_path_prefix
(e.g. lake_path_prefix.strip("/")
) before building the stage and COPY paths to prevent accidental //
in URIs?
Sketch for optional STORAGE INTEGRATION support:
- creds_clause = f"""CREDENTIALS = (
- AWS_KEY_ID = '{lake_store.aws_access_key_id}'
- AWS_SECRET_KEY = '{lake_store.aws_secret_access_key}'
- )"""
+ creds_clause = (
+ f"STORAGE_INTEGRATION = {lake_store.storage_integration_name}"
+ if lake_store.storage_integration_name
+ else f"""CREDENTIALS = (
+ AWS_KEY_ID = '{lake_store.aws_access_key_id}'
+ AWS_SECRET_KEY = '{lake_store.aws_secret_access_key}'
+ )"""
+ )
Identifier helper example:
import re
def _safe_ident(ident: str) -> str:
if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", ident):
raise ValueError(f"Unsafe identifier: {ident!r}")
return ident
Then wrap each of qualified_prefix
, stage_name
, file_format_name
, etc. wdyt?
🤖 Prompt for AI Agents
In airbyte/caches/snowflake.py around lines 124 to 129 and extending to lines
132-135 and 182, 240, the code currently embeds AWS credentials directly in SQL
statements and interpolates identifiers without validation or quoting, and uses
lake_path_prefix without normalizing slashes. To fix this, implement a
_safe_ident() helper function that validates identifiers against the regex
^[A-Za-z_][A-Za-z0-9_]*$ and use it to wrap all interpolated identifiers like
qualified_prefix, stage_name, and file_format_name. Replace raw AWS credentials
in CREATE STAGE statements with a Snowflake STORAGE INTEGRATION or IAM role to
avoid exposing secrets. Also, normalize lake_path_prefix by stripping leading
and trailing slashes before constructing stage and copy paths to prevent double
slashes in URIs.
def __init__( | ||
self, | ||
bucket_name: str, | ||
region: str, | ||
aws_access_key_id: str, | ||
aws_secret_access_key: str, | ||
short_name: str = "s3", | ||
) -> None: | ||
"""Initialize S3LakeStorage with required parameters.""" | ||
self.bucket_name = bucket_name | ||
self.region = region | ||
self.aws_access_key_id = aws_access_key_id | ||
self.aws_secret_access_key = aws_secret_access_key | ||
self.short_name = self._validate_short_name(short_name) | ||
|
||
@property |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Expose optional storage integration name for secure Snowflake stages.
To avoid embedding AWS keys in Snowflake SQL, would you consider adding an optional storage_integration_name
to S3LakeStorage
so callers can prefer STORAGE_INTEGRATION
over raw credentials, wdyt?
class S3LakeStorage(LakeStorage):
@@
- def __init__(
- self,
- bucket_name: str,
- region: str,
- aws_access_key_id: str,
- aws_secret_access_key: str,
- short_name: str = "s3",
- ) -> None:
+ def __init__(
+ self,
+ bucket_name: str,
+ region: str,
+ aws_access_key_id: str,
+ aws_secret_access_key: str,
+ short_name: str = "s3",
+ storage_integration_name: str | None = None,
+ ) -> None:
@@
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.short_name = self._validate_short_name(short_name)
+ self.storage_integration_name = storage_integration_name
I can follow through with Snowflake stage logic to prefer STORAGE_INTEGRATION
when provided.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def __init__( | |
self, | |
bucket_name: str, | |
region: str, | |
aws_access_key_id: str, | |
aws_secret_access_key: str, | |
short_name: str = "s3", | |
) -> None: | |
"""Initialize S3LakeStorage with required parameters.""" | |
self.bucket_name = bucket_name | |
self.region = region | |
self.aws_access_key_id = aws_access_key_id | |
self.aws_secret_access_key = aws_secret_access_key | |
self.short_name = self._validate_short_name(short_name) | |
@property | |
def __init__( | |
self, | |
bucket_name: str, | |
region: str, | |
aws_access_key_id: str, | |
aws_secret_access_key: str, | |
short_name: str = "s3", | |
storage_integration_name: str | None = None, | |
) -> None: | |
"""Initialize S3LakeStorage with required parameters.""" | |
self.bucket_name = bucket_name | |
self.region = region | |
self.aws_access_key_id = aws_access_key_id | |
self.aws_secret_access_key = aws_secret_access_key | |
self.short_name = self._validate_short_name(short_name) | |
self.storage_integration_name = storage_integration_name | |
@property |
🤖 Prompt for AI Agents
In airbyte/lakes.py around lines 84 to 99, add an optional parameter named
storage_integration_name to the __init__ method of S3LakeStorage with a default
value of None. Store this parameter as an instance attribute to allow callers to
use STORAGE_INTEGRATION in Snowflake stages instead of embedding AWS keys.
Ensure this new attribute is properly initialized alongside the existing
parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
examples/run_fast_lake_copy.py (1)
158-166
: Fix typo in S3 bucket name (will cause runtime failures).The bucket string is misspelled as "ab-destiantion-iceberg-us-west-2" in both the log line and the constructor. Assuming the intended name is "ab-destination-iceberg-us-west-2", this will 404. Can we correct the spelling and/or make it configurable, wdyt?
Apply this minimal fix:
- print(" Using co-located bucket: ab-destiantion-iceberg-us-west-2 (us-west-2)") + print(" Using co-located bucket: ab-destination-iceberg-us-west-2 (us-west-2)") s3_lake = S3LakeStorage( - bucket_name="ab-destiantion-iceberg-us-west-2", + bucket_name="ab-destination-iceberg-us-west-2", region="us-west-2", aws_access_key_id=credentials["aws_access_key_id"], aws_secret_access_key=credentials["aws_secret_access_key"], short_name="s3_main", # Custom short name for AIRBYTE_LAKE_S3_MAIN_ artifacts )Optionally, would you like to read the bucket name from an env var (e.g., FAST_LAKE_BUCKET_NAME) to avoid hard-coding and improve portability, as previously suggested? wdyt?
🧹 Nitpick comments (2)
examples/run_fast_lake_copy.py (2)
66-73
: Clarify error message for missing GCP credentials.The exception mentions only DEVIN_GCP_SERVICE_ACCOUNT_JSON, but the code also supports GCP_GSM_CREDENTIALS. Shall we update the message to reference both, wdyt?
Apply this diff:
- if not gcp_creds: - raise ValueError( - "DEVIN_GCP_SERVICE_ACCOUNT_JSON environment variable not found" - ) + if not gcp_creds: + raise ValueError( + "Missing GCP credentials: set DEVIN_GCP_SERVICE_ACCOUNT_JSON or GCP_GSM_CREDENTIALS" + )
241-249
: Remove unused unload_results to keep the example lean.The collected unload_results are not used. For an examples/* script, simpler is better—can we drop the list and append, wdyt?
Apply this diff:
- unload_results: list[FastUnloadResult] = [] for stream_name in streams: - unload_results.append( - snowflake_cache_source.fast_unload_stream( - stream_name=stream_name, - lake_store=s3_lake, - ) - ) + snowflake_cache_source.fast_unload_stream( + stream_name=stream_name, + lake_store=s3_lake, + )And since FastUnloadResult becomes unused, update the import as well:
-from airbyte.lakes import FastUnloadResult, S3LakeStorage +from airbyte.lakes import S3LakeStorage
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
examples/run_fast_lake_copy.py
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
examples/*
⚙️ CodeRabbit Configuration File
These scripts are intended for demonstration purposes only. They are not meant to represent production code. For these scripts, you should prefer brevity over robust error handling and handling of edge cases. These are demos - which should be as simple as possible to handle the 'blessed' use cases.
Files:
examples/run_fast_lake_copy.py
🧬 Code Graph Analysis (1)
examples/run_fast_lake_copy.py (8)
airbyte/caches/snowflake.py (1)
SnowflakeCache
(80-245)airbyte/lakes.py (2)
FastUnloadResult
(70-78)S3LakeStorage
(81-107)airbyte/secrets/google_gsm.py (1)
GoogleGSMSecretManager
(75-298)airbyte/sources/base.py (1)
Source
(50-816)airbyte/caches/base.py (4)
streams
(240-248)fast_unload_stream
(413-436)create_source_tables
(310-344)fast_load_stream
(476-496)airbyte/cloud/connections.py (1)
source
(81-90)airbyte/results.py (1)
cache
(98-100)airbyte/caches/bigquery.py (1)
fast_load_stream
(120-148)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (6)
examples/run_fast_lake_copy.py (6)
69-72
: Clarify error message to mention both supported env varsYou accept either DEVIN_GCP_SERVICE_ACCOUNT_JSON or GCP_GSM_CREDENTIALS, but the error mentions only the former. Shall we make the message explicit? wdyt?
Apply this diff:
- raise ValueError( - "DEVIN_GCP_SERVICE_ACCOUNT_JSON environment variable not found" - ) + raise ValueError( + "Neither DEVIN_GCP_SERVICE_ACCOUNT_JSON nor GCP_GSM_CREDENTIALS environment variables were found" + )
23-23
: Avoid hard-failing on Windows: guard resource importThe resource module isn’t available on Windows. Importing it at module import time will crash the script. Shall we guard the import so non-Unix users can still run the demo? wdyt?
Apply this diff:
-import resource +try: + import resource # type: ignore +except Exception: + resource = None # type: ignore
390-399
: Skip RLIMIT tweaks if resource is unavailableTies in with the guarded import above. Let’s make the RLIMIT update conditional so the script remains cross-platform. wdyt?
Apply this diff:
- soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) - print(f"📁 Current file descriptor limits: soft={soft}, hard={hard}") - try: - new_soft = min(hard, 65536) - resource.setrlimit(resource.RLIMIT_NOFILE, (new_soft, hard)) - soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) - print(f"📁 Updated file descriptor limits: soft={soft}, hard={hard}") - except (ValueError, OSError) as e: - print(f"⚠️ Could not increase file descriptor limit: {e}") + if resource is not None: + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + print(f"📁 Current file descriptor limits: soft={soft}, hard={hard}") + try: + new_soft = min(hard, 65536) + resource.setrlimit(resource.RLIMIT_NOFILE, (new_soft, hard)) + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + print(f"📁 Updated file descriptor limits: soft={soft}, hard={hard}") + except (ValueError, OSError) as e: + print(f"⚠️ Could not increase file descriptor limit: {e}") + else: + print("📁 Skipping RLIMIT tweaks: resource module unavailable on this platform.")
196-201
: Drop unused variable assignment from read() callread_result isn’t used later, and dropping it reduces noise in the example. Shall we simplify? wdyt?
Apply this diff:
- read_result = source.read( + source.read( cache=snowflake_cache_source, force_full_refresh=True, write_strategy="replace", )
241-249
: Remove unused unload_results listThe list is never read. Simplifying the loop makes the example tighter. If you remove it, consider also removing the unused FastUnloadResult import. wdyt?
Apply this diff:
- unload_results: list[FastUnloadResult] = [] - for stream_name in streams: - unload_results.append( - snowflake_cache_source.fast_unload_stream( - stream_name=stream_name, - lake_store=s3_lake, - ) - ) + for stream_name in streams: + snowflake_cache_source.fast_unload_stream( + stream_name=stream_name, + lake_store=s3_lake, + )
5-8
: Align docstring with actual example (Snowflake + S3 only)The docstring mentions BigQuery and GCS, but this script exercises Snowflake with S3. Shall we narrow the wording so expectations match the runnable demo? wdyt?
Apply this diff:
-- Direct bulk operations (Snowflake COPY INTO, BigQuery LOAD DATA FROM) -- Lake storage as an intermediate layer (S3 and GCS) +- Direct Snowflake bulk operations (COPY INTO) +- S3 lake storage as an intermediate layer
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
examples/run_fast_lake_copy.py
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
examples/*
⚙️ CodeRabbit Configuration File
These scripts are intended for demonstration purposes only. They are not meant to represent production code. For these scripts, you should prefer brevity over robust error handling and handling of edge cases. These are demos - which should be as simple as possible to handle the 'blessed' use cases.
Files:
examples/run_fast_lake_copy.py
…load - Set RELOAD_INITIAL_SOURCE_DATA = False to skip initial data loading - 100MM raw data successfully loaded with performance metrics captured - Ready for future fast lake copy operations without data reload Co-Authored-By: AJ Steers <[email protected]>
- Enhanced FastUnloadResult with actual_record_count, compressed_size_bytes fields - Use connection context manager to capture COPY INTO metadata within same session - Query RESULT_SCAN(LAST_QUERY_ID()) to get actual rows_unloaded, input_bytes, output_bytes - Validated with test script showing correct schema parsing - Tested at 100MM record scale with successful metadata capture Co-Authored-By: AJ Steers <[email protected]>
…SCAN() - Use connection context manager to capture COPY INTO metadata - Query RESULT_SCAN(LAST_QUERY_ID()) for actual rows_unloaded, input_bytes, output_bytes - Add compressed_size_bytes field to FastUnloadResult - Tested successfully at 100MM record scale with accurate metadata capture Co-Authored-By: AJ Steers <[email protected]>
- Suppress SLF001 warning for metadata_row._mapping access - This is required for proper SQLAlchemy result row parsing - All other linting issues resolved Co-Authored-By: AJ Steers <[email protected]>
…script - Enhanced example script to print detailed unload results metadata - Shows actual record counts, files created, data sizes, and compression ratios - Displays file manifest entries from Snowflake RESULT_SCAN() metadata - Provides comprehensive summary of unload operation statistics Co-Authored-By: AJ Steers <[email protected]>
- Fixed F541 linting errors by removing f-string prefixes from print statements without placeholders - Auto-fixed by ruff --fix for better code quality Co-Authored-By: AJ Steers <[email protected]>
…/airbytehq/PyAirbyte into aj/feat/bananza-mode-replication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
airbyte/caches/snowflake.py (1)
88-89
: Fix incorrect paired_destination_name to match Snowflake.The cache advertises a BigQuery destination name while using a Snowflake destination config. This is misleading and likely breaks pairing logic.
Proposed fix:
- paired_destination_name: ClassVar[str | None] = "destination-bigquery" + paired_destination_name: ClassVar[str | None] = "destination-snowflake"Wdyt?
♻️ Duplicate comments (6)
airbyte/caches/snowflake.py (5)
114-118
: Prefer explicit type check over hasattr for S3-only path.Using hasattr(lake_store, "aws_access_key_id") is brittle. Would you switch to an explicit isinstance check against S3LakeStorage for clarity and future-proofing, wdyt?
Sketch (local import to avoid cycles):
def _setup_lake_artifacts( self, lake_store: LakeStorage, ) -> None: - if not hasattr(lake_store, "aws_access_key_id"): + # Avoid feature detection by attribute; be explicit about supported lake types. + from airbyte.lakes import S3LakeStorage # local import to avoid cycles + if not isinstance(lake_store, S3LakeStorage): raise NotImplementedError( "Snowflake lake operations currently only support S3 lake storage" )
186-192
: Normalize lake_path_prefix to avoid accidental double slashes in stage URIs.If callers pass a leading/trailing slash, the resulting path may contain "//".
Proposed tweak:
- unload_statement = f""" + normalized_prefix = lake_path_prefix.strip("/") + unload_statement = f""" COPY INTO @{qualified_prefix}.{stage_name}/{lake_path_prefix}/ FROM {qualified_table_name} FILE_FORMAT = {qualified_prefix}.{file_format_name} OVERWRITE = TRUE """And swap {lake_path_prefix} for {normalized_prefix} in the COPY path. Wdyt?
275-282
: Normalize lake_path_prefix before COPY INTO load path.To avoid // in paths when callers pass leading/trailing slashes, normalize prior to constructing the FROM path.
Proposed tweak:
- load_statement = f""" + normalized_prefix = lake_path_prefix.strip("/") + load_statement = f""" COPY INTO {qualified_table_name} - FROM @{qualified_prefix}.{stage_name}/{lake_path_prefix}/ + FROM @{qualified_prefix}.{stage_name}/{normalized_prefix}/ FILE_FORMAT = {qualified_prefix}.{file_format_name} MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE PURGE = FALSE """Wdyt?
125-141
: Avoid embedding AWS secrets in SQL; validate/quote identifiers; normalize paths.Creating stages with inline credentials exposes secrets in query history and logs. Also, unquoted identifiers and unvalidated names risk injection/casing surprises.
Would you consider:
- Supporting Snowflake STORAGE INTEGRATION (preferred) and falling back to CREDENTIALS only if missing?
- Validating identifiers (db/schema/stage/format/table) via a small helper before interpolation?
- Normalizing URIs and path components to avoid accidental double slashes?
Minimal, backwards-compatible change within this block:
- create_stage_sql = f""" + storage_integration = getattr(lake_store, "storage_integration_name", None) + creds_clause = ( + f"STORAGE_INTEGRATION = {storage_integration}" + if storage_integration + else f"""CREDENTIALS = ( + AWS_KEY_ID = '{lake_store.aws_access_key_id}' + AWS_SECRET_KEY = '{lake_store.aws_secret_access_key}' + )""" + ) + create_stage_sql = f""" CREATE STAGE IF NOT EXISTS {qualified_prefix}.{stage_name} URL = '{lake_store.root_storage_uri}' - CREDENTIALS = ( - AWS_KEY_ID = '{lake_store.aws_access_key_id}' - AWS_SECRET_KEY = '{lake_store.aws_secret_access_key}' - ) + {creds_clause} FILE_FORMAT = {qualified_prefix}.{file_format_name} """If you’d like, I can also add a simple _safe_ident() helper and wire it through these names, wdyt?
171-183
: Stop accessing private processor attributes; use public config fields.Accessing self._read_processor.sql_config.schema_name couples to internals; prefer self.schema_name (already present on the config).
Proposed fix:
- else: - qualified_table_name = f"{self._read_processor.sql_config.schema_name}.{table_name}" + else: + qualified_table_name = f"{self.schema_name}.{table_name}"Apply the same change in both fast_unload_table (Line 182) and fast_load_table (Line 265). Wdyt?
Also applies to: 254-266
examples/run_fast_lake_copy.py (1)
158-166
: Fix S3 bucket name typo and allow env overrides.“ab-destiantion…” is misspelled and will fail. For demos, env overrides make this easier to run without code changes. Shall we read S3_BUCKET_NAME and AWS_REGION with safe defaults, wdyt?
Proposed change:
- print(f"🏞️ [{datetime.now().strftime('%H:%M:%S')}] Setting up S3 lake storage...") - print(" Using co-located bucket: ab-destiantion-iceberg-us-west-2 (us-west-2)") + print(f"🏞️ [{datetime.now().strftime('%H:%M:%S')}] Setting up S3 lake storage...") + bucket_name = os.environ.get("S3_BUCKET_NAME", "ab-destination-iceberg-us-west-2") + region = os.environ.get("AWS_REGION", "us-west-2") + print(f" Using co-located bucket: {bucket_name} ({region})") s3_lake = S3LakeStorage( - bucket_name="ab-destiantion-iceberg-us-west-2", - region="us-west-2", + bucket_name=bucket_name, + region=region, aws_access_key_id=credentials["aws_access_key_id"], aws_secret_access_key=credentials["aws_secret_access_key"], short_name="s3_main", # Custom short name for AIRBYTE_LAKE_S3_MAIN_ artifacts )
🧹 Nitpick comments (6)
airbyte/caches/snowflake.py (2)
96-104
: Reuse LakeStorage.get_artifact_prefix() instead of duplicating logic.LakeStorage already provides a canonical get_artifact_prefix(). Can we delegate to it to avoid drift?
Proposed change:
def _get_lake_artifact_prefix(self, lake_store: LakeStorage) -> str: - """Get the artifact prefix for this lake storage.""" - return f"AIRBYTE_LAKE_{lake_store.short_name.upper()}_" + """Get the artifact prefix for this lake storage.""" + return lake_store.get_artifact_prefix()This keeps naming centralized and consistent across lake types. Wdyt?
267-272
: Remove duplicated variables (qualified_prefix, file_format_name, stage_name).These are recomputed immediately after being computed above; the second block is redundant.
Proposed removal:
- qualified_prefix = ( - f"{self.database}.{self.schema_name}" if self.database else self.schema_name - ) - file_format_name = self._get_lake_file_format_name(lake_store) - stage_name = self._get_lake_stage_name(lake_store)Keeps the function concise and avoids confusion, wdyt?
examples/run_fast_lake_copy.py (4)
4-11
: Align the docstring with this script’s Snowflake-only flow.The bullet mentions BigQuery LOAD DATA, but this script demonstrates Snowflake → S3 → Snowflake. For clarity in a demo, shall we drop the BigQuery reference here, wdyt?
Proposed change:
-This script demonstrates 100x performance improvements by using: -- Direct bulk operations (Snowflake COPY INTO, BigQuery LOAD DATA FROM) +This script demonstrates 100x performance improvements by using: +- Direct bulk operations (Snowflake COPY INTO)
69-72
: Clarify the error message to reflect both accepted env vars.We check DEVIN_GCP_SERVICE_ACCOUNT_JSON or GCP_GSM_CREDENTIALS. Should the error message mention both to reduce confusion?
Proposed change:
- raise ValueError( - "DEVIN_GCP_SERVICE_ACCOUNT_JSON environment variable not found" - ) + raise ValueError( + "Missing Google Cloud credentials. Set DEVIN_GCP_SERVICE_ACCOUNT_JSON or GCP_GSM_CREDENTIALS." + )Wdyt?
181-188
: Consider deriving expected_record_count from actual records for summary.If RELOAD_INITIAL_SOURCE_DATA is False or the faker config diverges, NUM_RECORDS might not match actual data, skewing % complete. For a demo, would you set expected_record_count = actual_records after counting in Step 1/skip path, wdyt?
No diff provided to keep the script concise, but happy to propose one if you want it wired through the summary prints.
Also applies to: 361-387
440-442
: Broaden exception handling for rlimit on unsupported platforms.resource.setrlimit may raise NotImplementedError on some platforms. Shall we include it to avoid noisy tracebacks in the demo, wdyt?
Proposed change:
- except (ValueError, OSError) as e: + except (ValueError, OSError, NotImplementedError) as e: print(f"⚠️ Could not increase file descriptor limit: {e}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte/caches/snowflake.py
(2 hunks)airbyte/lakes.py
(1 hunks)examples/run_fast_lake_copy.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte/lakes.py
🧰 Additional context used
📓 Path-based instructions (1)
examples/*
⚙️ CodeRabbit Configuration File
These scripts are intended for demonstration purposes only. They are not meant to represent production code. For these scripts, you should prefer brevity over robust error handling and handling of edge cases. These are demos - which should be as simple as possible to handle the 'blessed' use cases.
Files:
examples/run_fast_lake_copy.py
🧬 Code Graph Analysis (2)
airbyte/caches/snowflake.py (4)
airbyte/caches/base.py (3)
execute_sql
(125-147)fast_unload_table
(438-452)processor
(151-153)airbyte/destinations/_translate_cache_to_dest.py (1)
snowflake_cache_to_destination_configuration
(100-114)airbyte/lakes.py (5)
FastUnloadResult
(70-84)LakeStorage
(13-67)root_storage_uri
(30-32)root_storage_uri
(111-113)root_storage_uri
(133-135)airbyte/caches/bigquery.py (1)
fast_unload_table
(70-117)
examples/run_fast_lake_copy.py (3)
airbyte/lakes.py (2)
FastUnloadResult
(70-84)S3LakeStorage
(87-113)airbyte/secrets/google_gsm.py (1)
GoogleGSMSecretManager
(75-298)airbyte/caches/base.py (2)
streams
(240-248)fast_unload_stream
(413-436)
- Test all three warehouse sizes (COMPUTE_WH, COMPUTE_WH_LARGE, COMPUTE_WH_2XLARGE) - Use unique timestamped S3 paths with warehouse name subdirectories - Add comprehensive performance comparison separating unload vs load stats - Include cost efficiency analysis (records per CPU-minute) - Add scaling efficiency analysis relative to xsmall baseline - Print explicit S3 output paths for better tracking - Add comprehensive warehouse options documentation in code comments Performance Results Summary: - Best unload performance: COMPUTE_WH_2XLARGE (8.1M rec/s, 1854 MB/s) - Best load performance: COMPUTE_WH_2XLARGE (600K rec/s, 137 MB/s) - Most cost efficient: COMPUTE_WH (18.8M records/CPU-minute) - Scaling shows diminishing returns at larger warehouse sizes Co-Authored-By: AJ Steers <[email protected]>
…r.devin.ai/proxy/github.com/airbytehq/PyAirbyte into aj/feat/bananza-mode-replication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
examples/run_fast_lake_copy.py (2)
291-295
: Missing required lake_path_prefix in fast_unload_stream call (runtime TypeError).SnowflakeCache.fast_unload_stream requires lake_path_prefix. This will raise a TypeError at runtime. Recommend passing stream_name as the prefix to align with your Step 3 load and printed URIs. wdyt?
unload_results.append( snowflake_cache_source.fast_unload_stream( stream_name=stream_name, lake_store=s3_lake, + lake_path_prefix=stream_name, ) )
197-206
: Fix S3 bucket name typo and allow env overrides for bucket/region.The bucket name has a typo ("ab-destiantion…"), and making bucket/region configurable eases running the example outside your environment without code changes. Shall we use env vars with sane defaults and fix the print? wdyt?
- print(" Using co-located bucket: ab-destiantion-iceberg-us-west-2 (us-west-2)") - - s3_lake = CustomS3LakeStorage( - path_prefix=unique_path_prefix, - bucket_name="ab-destiantion-iceberg-us-west-2", - region="us-west-2", - aws_access_key_id=credentials["aws_access_key_id"], - aws_secret_access_key=credentials["aws_secret_access_key"], - short_name="s3_main", # Custom short name for AIRBYTE_LAKE_S3_MAIN_ artifacts - ) + bucket_name = os.environ.get("S3_BUCKET_NAME", "ab-destination-iceberg-us-west-2") + region = os.environ.get("AWS_REGION", "us-west-2") + print(f" Using co-located bucket: {bucket_name} ({region})") + + s3_lake = CustomS3LakeStorage( + path_prefix=unique_path_prefix, + bucket_name=bucket_name, + region=region, + aws_access_key_id=credentials["aws_access_key_id"], + aws_secret_access_key=credentials["aws_secret_access_key"], + short_name="s3_main", # Custom short name for AIRBYTE_LAKE_S3_MAIN_ artifacts + )
🧹 Nitpick comments (3)
examples/run_fast_lake_copy.py (3)
208-208
: Clarify the printed S3 root to include the storage path, not just the bucket.root_storage_uri on S3LakeStorage returns s3://{bucket}/. If you want to show the full storage root including your custom prefix/path, print the URI for root_storage_path. wdyt?
- print(f" 📍 Full S3 root URI: {s3_lake.root_storage_uri}") + print(f" 📍 Full S3 storage root: {s3_lake.path_to_uri(s3_lake.root_storage_path)}")
78-78
: Make GCP project configurable and improve the missing-creds error message.Small tweaks improve portability and clarity for folks running this example. Would you consider these? wdyt?
-AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing" +AIRBYTE_INTERNAL_GCP_PROJECT = os.environ.get("GCP_PROJECT", "dataline-integration-testing")- if not gcp_creds: - raise ValueError( - "DEVIN_GCP_SERVICE_ACCOUNT_JSON environment variable not found" - ) + if not gcp_creds: + raise ValueError( + "Missing GCP service account JSON. Set DEVIN_GCP_SERVICE_ACCOUNT_JSON or GCP_GSM_CREDENTIALS." + )Also applies to: 83-86
246-246
: Avoid hard-coding the stream name; use the streams list for counts.This keeps the logic correct if you change streams later (even if it’s currently single-stream). wdyt?
- actual_records = len(snowflake_cache_source["purchases"]) + actual_records = sum(len(snowflake_cache_source[s]) for s in streams)- actual_records = len(snowflake_cache_source["purchases"]) + actual_records = sum(len(snowflake_cache_source[s]) for s in streams)Also applies to: 269-269
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
examples/run_fast_lake_copy.py
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
examples/*
⚙️ CodeRabbit Configuration File
These scripts are intended for demonstration purposes only. They are not meant to represent production code. For these scripts, you should prefer brevity over robust error handling and handling of edge cases. These are demos - which should be as simple as possible to handle the 'blessed' use cases.
Files:
examples/run_fast_lake_copy.py
🧬 Code Graph Analysis (1)
examples/run_fast_lake_copy.py (3)
airbyte/lakes.py (6)
FastUnloadResult
(70-84)S3LakeStorage
(87-113)root_storage_uri
(30-32)root_storage_uri
(111-113)root_storage_uri
(133-135)get_stream_root_uri
(50-55)airbyte/secrets/google_gsm.py (1)
GoogleGSMSecretManager
(75-298)airbyte/caches/base.py (3)
streams
(240-248)fast_unload_stream
(413-436)fast_load_stream
(476-496)
…rmance output - Remove separate cost efficiency analysis table (Records per CPU-minute) - Remove separate scaling efficiency analysis table - Keep total compute minutes in main performance summary tables - Maintain comprehensive performance analysis with cleaner output Co-Authored-By: AJ Steers <[email protected]>
…a capture - Add FastLoadResult class in airbyte/lakes.py with comprehensive load operation metadata - Update fast_load_table method in airbyte/caches/snowflake.py to use connection context manager - Capture actual record counts, file counts, and manifest data from RESULT_SCAN() - Add test scripts to validate COPY INTO load metadata schema - Update example script to use FastLoadResult metadata for accurate performance calculations - Enable verification of actual vs assumed record counts in load operations Co-Authored-By: AJ Steers <[email protected]>
…and fast_load_table methods - Fix qualified_table_name construction to use self.database and self.schema_name - Ensures proper table resolution in both unload and load operations - Addresses table qualification issues in FastLoadResult implementation Co-Authored-By: AJ Steers <[email protected]>
- Add test_fastload_result_validation.py for isolated FastLoadResult testing - Add test_simple_load_result_scan.py and test_load_metadata_schema.py for RESULT_SCAN() schema exploration - Add test_snowflake_load_result_scan.py for comprehensive load metadata validation - These scripts helped identify and validate the FastLoadResult implementation approach Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🔭 Outside diff range comments (1)
airbyte/caches/base.py (1)
115-123
: Incorrect super() usage – config_hash will not resolve.
return super(SqlConfig, self).config_hash
skips SqlConfig in the MRO and likely fails. Should we usesuper().config_hash
instead, wdyt?- return super(SqlConfig, self).config_hash + return super().config_hash
♻️ Duplicate comments (10)
airbyte/caches/base.py (3)
438-447
: Parameter mismatch: caller passes lake_path_prefix but method signature expects path_prefix.This currently raises TypeError at runtime. Can we align the base signature with callers and downstream implementations (Snowflake/BigQuery) by renaming to lake_path_prefix, wdyt?
- def fast_unload_table( + def fast_unload_table( self, table_name: str, lake_store: LakeStorage, *, stream_name: str | None = None, db_name: str | None = None, schema_name: str | None = None, - path_prefix: str | None = None, + lake_path_prefix: str | None = None, ) -> FastUnloadResult:
469-473
: Missing required argument when calling fast_load_stream.
lake_path_prefix
is required byfast_load_stream()
. Passing only two args will raise TypeError. Shall we pass the stream name as the prefix by default, wdyt?- self.fast_load_stream( - stream_name, - lake_store, - ) + self.fast_load_stream( + stream_name=stream_name, + lake_store=lake_store, + lake_path_prefix=stream_name, + )
424-436
: Override check is ineffective; enforce subclass override robustly.
hasattr(self, "fast_unload_table")
is always True because CacheBase defines it. Shall we compare the underlying function to ensure an override, wdyt?- if not hasattr(self, "fast_unload_table"): - raise NotImplementedError("Subclasses must implement `fast_unload_table()` method") + if getattr(self.fast_unload_table, "__func__", self.fast_unload_table) is CacheBase.fast_unload_table: + raise NotImplementedError("Subclass must override fast_unload_table()")test_simple_load_result_scan.py (1)
18-19
: Fix secret parsing: use SecretString.parse_json() instead of json.loads.GoogleGSMSecretManager.get_secret returns a SecretString; json.loads on that object is incorrect. Shall we use .parse_json() like the shared fixture does, wdyt?
- snowflake_creds = json.loads(gsm.get_secret("AIRBYTE_LIB_SNOWFLAKE_CREDS")) + snowflake_creds = gsm.get_secret("AIRBYTE_LIB_SNOWFLAKE_CREDS").parse_json()test_load_metadata_schema.py (1)
18-19
: Fix secret parsing: use SecretString.parse_json() instead of json.loads.Same as other tests; this prevents type errors and aligns with existing conftest usage. Shall we switch to .parse_json(), wdyt?
- snowflake_creds = json.loads(gsm.get_secret("AIRBYTE_LIB_SNOWFLAKE_CREDS")) + snowflake_creds = gsm.get_secret("AIRBYTE_LIB_SNOWFLAKE_CREDS").parse_json()test_fastload_result_validation.py (1)
21-22
: Fix secret parsing: use SecretString.parse_json() instead of json.loads.This mirrors the pattern in tests/integration_tests/cloud/conftest.py. Change avoids runtime errors, wdyt?
- snowflake_creds = json.loads(gsm.get_secret("AIRBYTE_LIB_SNOWFLAKE_CREDS")) + snowflake_creds = gsm.get_secret("AIRBYTE_LIB_SNOWFLAKE_CREDS").parse_json()airbyte/caches/snowflake.py (3)
114-117
: Prefer explicit type check over hasattr for S3-only pathUsing
hasattr(lake_store, "aws_access_key_id")
is brittle. Would you switch to an explicitisinstance
check againstS3LakeStorage
for clarity and safety, wdyt?- if not hasattr(lake_store, "aws_access_key_id"): + # Avoid feature-detection; be explicit about supported lake types. + from airbyte.lakes import S3LakeStorage + if not isinstance(lake_store, S3LakeStorage): raise NotImplementedError( "Snowflake lake operations currently only support S3 lake storage" )
132-140
: Harden SQL: avoid embedding credentials; validate identifiersEmbedding AWS keys in SQL is risky (query history/logs). Consider using Snowflake STORAGE INTEGRATION or validating/escaping identifiers to prevent injection, wdyt?
Also, identifiers like
qualified_prefix
,stage_name
, andfile_format_name
are interpolated unquoted. Would you validate/escape them to prevent identifier injection and casing surprises?Here's a helper to validate identifiers:
import re def _safe_ident(ident: str) -> str: if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", ident): raise ValueError(f"Unsafe identifier: {ident!r}") return identThen wrap each identifier before interpolation:
file_format_name = _safe_ident(file_format_name) stage_name = _safe_ident(stage_name)
193-219
: Aggregate COPY INTO metadata correctlyResult scan likely returns multiple rows (one per file). Currently only processing one row and setting
files_created=1
, which undercounts. Should we aggregate all rows for accurate metrics, wdyt?- metadata_row = result_scan_result.fetchone() - - actual_record_count = None - files_created = None - total_data_size_bytes = None - compressed_size_bytes = None - file_manifest = [] - - if metadata_row: - row_dict = ( - dict(metadata_row._mapping) # noqa: SLF001 - if hasattr(metadata_row, "_mapping") - else dict(metadata_row) - ) - file_manifest.append(row_dict) - - actual_record_count = row_dict.get("rows_unloaded") - total_data_size_bytes = row_dict.get("input_bytes") - compressed_size_bytes = row_dict.get("output_bytes") - files_created = 1 + rows = result_scan_result.fetchall() + + actual_record_count = 0 + total_data_size_bytes = 0 + compressed_size_bytes = 0 + file_manifest = [] + + for row in rows: + row_dict = ( + dict(row._mapping) # noqa: SLF001 + if hasattr(row, "_mapping") + else dict(row) + ) + file_manifest.append(row_dict) + + actual_record_count += int(row_dict.get("rows_unloaded", 0) or 0) + total_data_size_bytes += int(row_dict.get("input_bytes", 0) or 0) + compressed_size_bytes += int(row_dict.get("output_bytes", 0) or 0) + + files_created = len(rows) if rows else Noneexamples/run_fast_lake_copy.py (1)
197-201
: Fix S3 bucket name typoThe bucket name has a typo ("ab-destiantion-iceberg-us-west-2"). This will fail at runtime, wdyt about fixing it?
- print(" Using co-located bucket: ab-destiantion-iceberg-us-west-2 (us-west-2)") + print(" Using co-located bucket: ab-destination-iceberg-us-west-2 (us-west-2)") s3_lake = CustomS3LakeStorage( path_prefix=unique_path_prefix, - bucket_name="ab-destiantion-iceberg-us-west-2", + bucket_name="ab-destination-iceberg-us-west-2",
🧹 Nitpick comments (8)
airbyte/lakes.py (2)
25-26
: Docstring: use “gs://” for GCS, not “gcs://”.Small consistency nit: Snowflake/URI conventions use gs:// for Google Cloud Storage. Shall we fix the example to avoid confusion, wdyt?
- E.g. "file://", "s3://", "gcs://", etc. + E.g. "file://", "s3://", "gs://", etc.
39-42
: Defensive join to avoid double slashes in URIs.If callers pass a path starting with “/”, path_to_uri will produce “//” in the middle. Do you want to harden this by stripping a leading slash, wdyt?
- def path_to_uri(self, path: str) -> str: + def path_to_uri(self, path: str) -> str: """Convert a relative lake path to a URI.""" - return f"{self.root_storage_uri}{path}" + normalized = path[1:] if path.startswith("/") else path + return f"{self.root_storage_uri}{normalized}"airbyte/caches/base.py (3)
461-468
: Docstring says “Unload…” for a load method.Minor clarity fix: this method loads from the lake. Update the docstring, wdyt?
- """Unload the cache to a lake store. + """Load data from a lake store into the cache backend.
488-490
: Genericize NotImplementedError message.This is the base class; referencing “Snowflake” is misleading. Shall we make the message backend-agnostic, wdyt?
- if zero_copy: - raise NotImplementedError("Zero-copy loading is not yet supported in Snowflake.") + if zero_copy: + raise NotImplementedError("Zero-copy loading is not supported in the base implementation.")
240-248
: Fix streams property docstring (currently misleading).Docstring says “Return a temporary table name.” but the property returns a mapping of stream names to CachedDataset. Update for clarity, wdyt?
- def streams(self) -> dict[str, CachedDataset]: - """Return a temporary table name.""" + def streams(self) -> dict[str, CachedDataset]: + """Return a mapping of stream names to their CachedDataset."""test_simple_load_result_scan.py (1)
20-27
: Optional: build the SQLAlchemy URL safely.Usernames/passwords with special characters can break f-string URLs. Would you like to switch to sqlalchemy.engine.URL.create for proper quoting, wdyt?
test_fastload_result_validation.py (1)
34-39
: Verify bucket name and consider making it configurable.Bucket “ab-destiantion-iceberg-us-west-2” looks misspelled (“destination”). Can you confirm it exists in the target account/region? Exposing it via env or pytest option may help portability, wdyt?
examples/run_fast_lake_copy.py (1)
197-205
: Consider making bucket configurable via environment variableThe hardcoded bucket might not be accessible to all users. Should we make it configurable via env var with this as the default, wdyt?
+ bucket_name = os.environ.get("S3_BUCKET_NAME", "ab-destination-iceberg-us-west-2") + region = os.environ.get("AWS_REGION", "us-west-2") - print(" Using co-located bucket: ab-destination-iceberg-us-west-2 (us-west-2)") + print(f" Using co-located bucket: {bucket_name} ({region})") s3_lake = CustomS3LakeStorage( path_prefix=unique_path_prefix, - bucket_name="ab-destination-iceberg-us-west-2", - region="us-west-2", + bucket_name=bucket_name, + region=region,
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
airbyte/caches/base.py
(12 hunks)airbyte/caches/snowflake.py
(2 hunks)airbyte/lakes.py
(1 hunks)examples/run_fast_lake_copy.py
(1 hunks)test_fastload_result_validation.py
(1 hunks)test_load_metadata_schema.py
(1 hunks)test_simple_load_result_scan.py
(1 hunks)test_snowflake_load_result_scan.py
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
examples/*
⚙️ CodeRabbit Configuration File
These scripts are intended for demonstration purposes only. They are not meant to represent production code. For these scripts, you should prefer brevity over robust error handling and handling of edge cases. These are demos - which should be as simple as possible to handle the 'blessed' use cases.
Files:
examples/run_fast_lake_copy.py
🧬 Code Graph Analysis (8)
test_load_metadata_schema.py (2)
airbyte/secrets/google_gsm.py (1)
GoogleGSMSecretManager
(75-298)tests/integration_tests/cloud/conftest.py (1)
snowflake_creds
(192-195)
test_fastload_result_validation.py (5)
airbyte/secrets/google_gsm.py (1)
GoogleGSMSecretManager
(75-298)airbyte/caches/snowflake.py (1)
SnowflakeCache
(81-318)airbyte/lakes.py (2)
S3LakeStorage
(104-130)FastLoadResult
(87-101)tests/integration_tests/cloud/conftest.py (1)
snowflake_creds
(192-195)airbyte/caches/base.py (1)
execute_sql
(125-147)
test_snowflake_load_result_scan.py (2)
airbyte/secrets/google_gsm.py (1)
GoogleGSMSecretManager
(75-298)tests/integration_tests/cloud/conftest.py (1)
snowflake_creds
(192-195)
test_simple_load_result_scan.py (2)
airbyte/secrets/google_gsm.py (1)
GoogleGSMSecretManager
(75-298)tests/integration_tests/cloud/conftest.py (1)
snowflake_creds
(192-195)
airbyte/lakes.py (1)
examples/run_fast_lake_copy.py (1)
root_storage_path
(175-177)
airbyte/caches/snowflake.py (5)
airbyte/_processors/sql/snowflake.py (2)
SnowflakeConfig
(36-175)SnowflakeSqlProcessor
(203-283)airbyte/caches/base.py (3)
execute_sql
(125-147)fast_unload_table
(438-452)processor
(151-153)airbyte/destinations/_translate_cache_to_dest.py (1)
snowflake_cache_to_destination_configuration
(100-114)airbyte/lakes.py (6)
FastLoadResult
(87-101)FastUnloadResult
(70-84)LakeStorage
(13-67)root_storage_uri
(30-32)root_storage_uri
(128-130)root_storage_uri
(150-152)airbyte/caches/bigquery.py (1)
fast_unload_table
(70-117)
examples/run_fast_lake_copy.py (4)
airbyte/caches/snowflake.py (1)
SnowflakeCache
(81-318)airbyte/lakes.py (8)
FastLoadResult
(87-101)FastUnloadResult
(70-84)S3LakeStorage
(104-130)root_storage_path
(35-37)root_storage_uri
(30-32)root_storage_uri
(128-130)root_storage_uri
(150-152)get_stream_root_uri
(50-55)airbyte/secrets/google_gsm.py (1)
GoogleGSMSecretManager
(75-298)airbyte/caches/base.py (3)
streams
(240-248)fast_unload_stream
(413-436)fast_load_stream
(476-496)
airbyte/caches/base.py (5)
airbyte/lakes.py (3)
FastLoadResult
(87-101)FastUnloadResult
(70-84)LakeStorage
(13-67)airbyte/datasets/_sql.py (3)
CachedDataset
(139-221)stream_name
(81-82)to_sql_table
(197-199)airbyte/results.py (1)
streams
(90-95)airbyte/caches/_catalog_backend.py (2)
stream_names
(78-80)stream_names
(248-249)airbyte/shared/catalog_providers.py (1)
stream_names
(75-77)
🔇 Additional comments (1)
examples/run_fast_lake_copy.py (1)
1-672
: Well-structured demo with comprehensive performance metrics!This example script beautifully demonstrates the fast lake copy workflow with detailed performance analysis across multiple warehouse sizes. The structured output with timing, throughput metrics, and cost analysis provides excellent insights for users evaluating the performance characteristics. Great work on the metadata extraction and validation approach!
airbyte/caches/snowflake.py
Outdated
first_row = file_manifest[0] if file_manifest else {} | ||
actual_record_count = first_row.get("rows_loaded") or first_row.get("rows_parsed") | ||
total_data_size_bytes = first_row.get("input_bytes") | ||
compressed_size_bytes = first_row.get("output_bytes") | ||
files_processed = len(file_manifest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aggregate load metadata across all files
Currently only using the first row's metadata, but should aggregate across all files for accurate totals, wdyt?
- first_row = file_manifest[0] if file_manifest else {}
- actual_record_count = first_row.get("rows_loaded") or first_row.get("rows_parsed")
- total_data_size_bytes = first_row.get("input_bytes")
- compressed_size_bytes = first_row.get("output_bytes")
- files_processed = len(file_manifest)
+ actual_record_count = sum(
+ int(entry.get("rows_loaded", 0) or entry.get("rows_parsed", 0) or 0)
+ for entry in file_manifest
+ )
+ total_data_size_bytes = sum(
+ int(entry.get("input_bytes", 0) or 0)
+ for entry in file_manifest
+ )
+ compressed_size_bytes = sum(
+ int(entry.get("output_bytes", 0) or 0)
+ for entry in file_manifest
+ )
+ files_processed = len(file_manifest)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
first_row = file_manifest[0] if file_manifest else {} | |
actual_record_count = first_row.get("rows_loaded") or first_row.get("rows_parsed") | |
total_data_size_bytes = first_row.get("input_bytes") | |
compressed_size_bytes = first_row.get("output_bytes") | |
files_processed = len(file_manifest) | |
# … previous code … | |
- first_row = file_manifest[0] if file_manifest else {} | |
- actual_record_count = first_row.get("rows_loaded") or first_row.get("rows_parsed") | |
- total_data_size_bytes = first_row.get("input_bytes") | |
- compressed_size_bytes = first_row.get("output_bytes") | |
- files_processed = len(file_manifest) | |
+ # Aggregate load metadata across all files | |
+ actual_record_count = sum( | |
+ int(entry.get("rows_loaded", 0) or entry.get("rows_parsed", 0) or 0) | |
+ for entry in file_manifest | |
+ ) | |
+ total_data_size_bytes = sum( | |
+ int(entry.get("input_bytes", 0) or 0) | |
+ for entry in file_manifest | |
+ ) | |
+ compressed_size_bytes = sum( | |
+ int(entry.get("output_bytes", 0) or 0) | |
+ for entry in file_manifest | |
+ ) | |
+ files_processed = len(file_manifest) | |
# … following code … |
🤖 Prompt for AI Agents
In airbyte/caches/snowflake.py around lines 303 to 307, the code only reads
metadata from the first manifest row but should aggregate across all manifest
rows; replace the single-row reads with a loop that sums per-file metrics: for
each entry in file_manifest, add int(rows_loaded) if present else
int(rows_parsed) (treat missing or non-int values as 0) to
actual_record_count_total, and sum input_bytes and output_bytes (treat
missing/non-int as 0) into total_data_size_bytes and compressed_size_bytes
respectively; set files_processed to len(file_manifest). Ensure safe
casting/guarding against None or string values.
def __init__( | ||
self, | ||
bucket_name: str, | ||
region: str, | ||
aws_access_key_id: str, | ||
aws_secret_access_key: str, | ||
short_name: str = "s3", | ||
) -> None: | ||
"""Initialize S3LakeStorage with required parameters.""" | ||
self.bucket_name = bucket_name | ||
self.region = region | ||
self.aws_access_key_id = aws_access_key_id | ||
self.aws_secret_access_key = aws_secret_access_key | ||
self.short_name = self._validate_short_name(short_name) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Offer storage integrations and redact secrets in repr to prevent credential exposure.
To avoid embedding AWS keys in warehouse DDL and accidental logging of secrets via repr/printing of result models (FastLoad/UnloadResult contain lake_store), can we:
- Add an optional storage_integration_name to S3LakeStorage (preferred for Snowflake stages)
- Redact aws_secret_access_key in repr/str, wdyt?
def __init__(
self,
bucket_name: str,
region: str,
aws_access_key_id: str,
aws_secret_access_key: str,
short_name: str = "s3",
+ storage_integration_name: str | None = None,
) -> None:
@@
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.short_name = self._validate_short_name(short_name)
+ self.storage_integration_name = storage_integration_name
Outside this range, consider adding a safe repr:
def __repr__(self) -> str:
return (
f"S3LakeStorage(bucket_name='{self.bucket_name}', region='{self.region}', "
f"aws_access_key_id='{self.aws_access_key_id[:4]}***', "
f"aws_secret_access_key='***REDACTED***', "
f"short_name='{self.short_name}', "
f"storage_integration_name='{self.storage_integration_name}')"
)
I’m happy to wire the Snowflake stage logic to prefer STORAGE INTEGRATION when provided, wdyt?
🤖 Prompt for AI Agents
In airbyte/lakes.py around lines 107 to 121, the S3LakeStorage __init__ should
accept an optional storage_integration_name and store it on the instance, and
the class should implement a safe __repr__/__str__ that redacts
aws_secret_access_key and partially masks aws_access_key_id to prevent
credential leakage; update the constructor signature to add
storage_integration_name: Optional[str] = None, assign
self.storage_integration_name = storage_integration_name (and validate if
needed), and add a __repr__ method that returns a string containing bucket_name,
region, short_name and storage_integration_name while showing only the first 4
chars of aws_access_key_id followed by *** and replacing aws_secret_access_key
with a constant redaction like '***REDACTED***'.
test_fastload_result_validation.py
Outdated
actual_table_count = cache.execute_sql(f"SELECT COUNT(*) FROM {qualified_dest_table}").fetchone()[0] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
execute_sql returns None; fetching COUNT(*) via it will fail.
execute_sql
is fire-and-forget. To fetch results, use the SQL connection from the processor. Shall we switch to a connection context, wdyt?
- actual_table_count = cache.execute_sql(f"SELECT COUNT(*) FROM {qualified_dest_table}").fetchone()[0]
+ with cache.processor.get_sql_connection() as connection:
+ actual_table_count = connection.execute(
+ text(f"SELECT COUNT(*) FROM {qualified_dest_table}")
+ ).scalar_one()
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
actual_table_count = cache.execute_sql(f"SELECT COUNT(*) FROM {qualified_dest_table}").fetchone()[0] | |
with cache.processor.get_sql_connection() as connection: | |
actual_table_count = connection.execute( | |
text(f"SELECT COUNT(*) FROM {qualified_dest_table}") | |
).scalar_one() |
🤖 Prompt for AI Agents
In test_fastload_result_validation.py around lines 114-115, the test calls
cache.execute_sql(...) which returns None (fire-and-forget) so fetchone() fails;
instead open the processor's SQL connection/context and run the SELECT COUNT(*)
through that connection, fetch the single-row result from the returned cursor,
extract the count, and close the context (i.e., replace the cache.execute_sql
call with using the processor's SQL connection/context to execute the query and
call fetchone() on the returned cursor).
test_snowflake_load_result_scan.py
Outdated
total_rows_loaded = sum(row_dict.get('ROWS_LOADED', 0) for row in rows) | ||
total_rows_parsed = sum(row_dict.get('ROWS_PARSED', 0) for row in rows) | ||
files_processed = len(rows) | ||
|
||
print(f"\n🎯 Key Load Metadata Summary:") | ||
print(f" total_rows_loaded: {total_rows_loaded}") | ||
print(f" total_rows_parsed: {total_rows_parsed}") | ||
print(f" files_processed: {files_processed}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aggregate all rows from RESULT_SCAN for accurate metrics
Currently only aggregating the first row's dict, but COPY INTO can return multiple rows (one per file). Should we sum across all rows for accurate totals, wdyt?
- total_rows_loaded = sum(row_dict.get('ROWS_LOADED', 0) for row in rows)
- total_rows_parsed = sum(row_dict.get('ROWS_PARSED', 0) for row in rows)
+ total_rows_loaded = sum(
+ (dict(row._mapping) if hasattr(row, '_mapping') else dict(row)).get('ROWS_LOADED', 0)
+ for row in rows
+ )
+ total_rows_parsed = sum(
+ (dict(row._mapping) if hasattr(row, '_mapping') else dict(row)).get('ROWS_PARSED', 0)
+ for row in rows
+ )
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
total_rows_loaded = sum(row_dict.get('ROWS_LOADED', 0) for row in rows) | |
total_rows_parsed = sum(row_dict.get('ROWS_PARSED', 0) for row in rows) | |
files_processed = len(rows) | |
print(f"\n🎯 Key Load Metadata Summary:") | |
print(f" total_rows_loaded: {total_rows_loaded}") | |
print(f" total_rows_parsed: {total_rows_parsed}") | |
print(f" files_processed: {files_processed}") | |
total_rows_loaded = sum( | |
(dict(row._mapping) if hasattr(row, '_mapping') else dict(row)).get('ROWS_LOADED', 0) | |
for row in rows | |
) | |
total_rows_parsed = sum( | |
(dict(row._mapping) if hasattr(row, '_mapping') else dict(row)).get('ROWS_PARSED', 0) | |
for row in rows | |
) | |
files_processed = len(rows) | |
print(f"\n🎯 Key Load Metadata Summary:") | |
print(f" total_rows_loaded: {total_rows_loaded}") | |
print(f" total_rows_parsed: {total_rows_parsed}") | |
print(f" files_processed: {files_processed}") |
🤖 Prompt for AI Agents
In test_snowflake_load_result_scan.py around lines 148 to 155, the test
currently computes totals using only a single row's dict instead of aggregating
across all RESULT_SCAN rows; update the logic to iterate over all rows and sum
each row's 'ROWS_LOADED' and 'ROWS_PARSED' values (defaulting to 0 when missing)
and set files_processed to len(rows) so total_rows_loaded and total_rows_parsed
reflect the sum across every row returned by COPY INTO.
test_snowflake_load_result_scan.py
Outdated
finally: | ||
try: | ||
connection.execute(text(f"DROP TABLE IF EXISTS {test_table}")) | ||
connection.execute(text(f"DROP TABLE IF EXISTS {source_table}")) | ||
except: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add specific exception handling in finally block
The bare except
in the finally block could mask important errors. Consider catching specific exceptions or at least logging them, wdyt?
finally:
try:
connection.execute(text(f"DROP TABLE IF EXISTS {test_table}"))
connection.execute(text(f"DROP TABLE IF EXISTS {source_table}"))
- except:
- pass
+ except Exception as cleanup_error:
+ print(f"⚠️ Cleanup warning: {cleanup_error}")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
finally: | |
try: | |
connection.execute(text(f"DROP TABLE IF EXISTS {test_table}")) | |
connection.execute(text(f"DROP TABLE IF EXISTS {source_table}")) | |
except: | |
pass | |
finally: | |
try: | |
connection.execute(text(f"DROP TABLE IF EXISTS {test_table}")) | |
connection.execute(text(f"DROP TABLE IF EXISTS {source_table}")) | |
except Exception as cleanup_error: | |
print(f"⚠️ Cleanup warning: {cleanup_error}") |
🤖 Prompt for AI Agents
In test_snowflake_load_result_scan.py around lines 191 to 196, the finally block
uses a bare except which can hide real errors; replace it with specific
exception handling—catch sqlalchemy.exc.SQLAlchemyError (or at minimum Exception
as e) and log the exception (e.g., logger.exception or print the error) so
failures to DROP TABLE are visible; do not silently swallow errors, and only
suppress them if you explicitly decide it's safe (otherwise re-raise after
logging).
- Add detailed file analysis for both unload and load operations - Print file names, record counts, and breakdown for debugging - Add comparison section to identify mismatches between operations - Debug logging shows unload working correctly but load timing out Co-Authored-By: AJ Steers <[email protected]>
…gging - Set NUM_RECORDS to 1M and RELOAD_INITIAL_SOURCE_DATA=True for debugging - Debug logging successfully shows file name comparison between unload/load - Confirmed unload creates 1 file with exact record count match - Load operation still experiencing network timeouts during COPY INTO Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
examples/run_fast_lake_copy.py (1)
197-206
: Fix S3 bucket name typo (runtime failure) and optionally allow env overridesThe bucket name is misspelled (“ab-destiantion…”) and will fail at runtime. While fixing the typo, shall we also allow bucket/region overrides via environment variables for easier local runs, keeping the same defaults? wdyt?
Apply this diff:
- print(" Using co-located bucket: ab-destiantion-iceberg-us-west-2 (us-west-2)") - - s3_lake = CustomS3LakeStorage( - path_prefix=unique_path_prefix, - bucket_name="ab-destiantion-iceberg-us-west-2", - region="us-west-2", + bucket_name = os.environ.get("S3_BUCKET_NAME", "ab-destination-iceberg-us-west-2") + region = os.environ.get("AWS_REGION", "us-west-2") + print(f" Using co-located bucket: {bucket_name} ({region})") + + s3_lake = CustomS3LakeStorage( + path_prefix=unique_path_prefix, + bucket_name=bucket_name, + region=region, aws_access_key_id=credentials["aws_access_key_id"], aws_secret_access_key=credentials["aws_secret_access_key"], short_name="s3_main", # Custom short name for AIRBYTE_LAKE_S3_MAIN_ artifacts )
🧹 Nitpick comments (5)
examples/run_fast_lake_copy.py (5)
407-414
: Use loaded-records fallback for MB/s to reflect step 3 workIf total_load_data_bytes is unavailable, the fallback uses actual_records (from step 1), which may differ from the actual loaded count. Shall we use total_load_records here to better reflect step 3 work? wdyt?
step3_mb_per_sec = ( (total_load_data_bytes / (1024 * 1024)) / step3_time if step3_time > 0 and total_load_data_bytes > 0 - else (actual_records * estimated_bytes_per_record) / (1024 * 1024) / step3_time + else (total_load_records * estimated_bytes_per_record) / (1024 * 1024) / step3_time if step3_time > 0 else 0 )
83-86
: Clarify missing-credentials error messageThe error mentions only DEVIN_GCP_SERVICE_ACCOUNT_JSON, but the code supports GCP_GSM_CREDENTIALS as well. Update the message to reduce confusion? wdyt?
- if not gcp_creds: - raise ValueError( - "DEVIN_GCP_SERVICE_ACCOUNT_JSON environment variable not found" - ) + if not gcp_creds: + raise ValueError( + "Missing Google Cloud credentials. Set DEVIN_GCP_SERVICE_ACCOUNT_JSON or GCP_GSM_CREDENTIALS." + )
4-9
: Docstring claims GCS/BigQuery, but example exercises S3/Snowflake onlyTo avoid confusion for readers, should we tune the bullets to match what the script actually demonstrates (S3 + Snowflake)? Alternatively, we could add a note that BigQuery/GCS are supported by the library but not shown here. wdyt?
-This script demonstrates 100x performance improvements by using: -- Direct bulk operations (Snowflake COPY INTO, BigQuery LOAD DATA FROM) -- Lake storage as an intermediate layer (S3 and GCS) +This script demonstrates 100x performance improvements by using: +- Direct Snowflake bulk operations (COPY INTO) +- S3 lake storage as an intermediate layer - Parallel processing of multiple streams - Optimized file formats (Parquet with compression)
22-27
: Make resource limit tweak cross-platform safe (Windows compatibility)The resource module is unavailable on some platforms (e.g., Windows). Shall we guard the import and usage to keep the example runnable everywhere while staying brief? wdyt?
-import resource +try: + import resource # type: ignore +except ImportError: # pragma: no cover + resource = None # type: ignore- soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) - print(f"📁 Current file descriptor limits: soft={soft}, hard={hard}") - try: - new_soft = min(hard, 65536) - resource.setrlimit(resource.RLIMIT_NOFILE, (new_soft, hard)) - soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) - print(f"📁 Updated file descriptor limits: soft={soft}, hard={hard}") - except (ValueError, OSError) as e: - print(f"⚠️ Could not increase file descriptor limit: {e}") + if resource is not None: + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + print(f"📁 Current file descriptor limits: soft={soft}, hard={hard}") + try: + new_soft = min(hard, 65536) + resource.setrlimit(resource.RLIMIT_NOFILE, (new_soft, hard)) + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + print(f"📁 Updated file descriptor limits: soft={soft}, hard={hard}") + except (ValueError, OSError) as e: + print(f"⚠️ Could not increase file descriptor limit: {e}") + else: + print("📁 Resource module not available on this platform; skipping FD limit adjustments.")Also applies to: 628-637
282-282
: Remove unnecessary f-string prefixes on constant stringsA few print statements use f-strings without interpolation; removing the f reduces linter noise (Ruff F541). Shall we clean these up? wdyt?
- print(f" 📂 S3 destination paths:") + print(" 📂 S3 destination paths:")- print(f" 📂 S3 source paths:") + print(" 📂 S3 source paths:")- print(f"\n🔍 [DEBUG] Unload vs Load File Comparison:") - print(f" Unload Summary:") + print("\n🔍 [DEBUG] Unload vs Load File Comparison:") + print(" Unload Summary:") @@ - print(f" Load Summary:") + print(" Load Summary:") @@ - print(f" ") + print(" ") @@ - print(f" ") + print(" ") - print(f" Potential Issues:") - print(f" - If file counts don't match: Load may be reading from wrong S3 path or missing files") - print(f" - If record counts don't match: Files may contain different data or path filters not working") - print(f" - Check S3 paths above to ensure unload and load are using same locations") + print(" Potential Issues:") + print(" - If file counts don't match: Load may be reading from wrong S3 path or missing files") + print(" - If record counts don't match: Files may contain different data or path filters not working") + print(" - Check S3 paths above to ensure unload and load are using same locations")Also applies to: 384-384, 489-503
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
examples/run_fast_lake_copy.py
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
examples/*
⚙️ CodeRabbit Configuration File
These scripts are intended for demonstration purposes only. They are not meant to represent production code. For these scripts, you should prefer brevity over robust error handling and handling of edge cases. These are demos - which should be as simple as possible to handle the 'blessed' use cases.
Files:
examples/run_fast_lake_copy.py
🔇 Additional comments (1)
examples/run_fast_lake_copy.py (1)
393-401
: Nice: fast_load_stream call includes required lake_path_prefixGood catch on the updated API—adding lake_path_prefix ensures loads target the correct S3 prefix per stream.
- Set RELOAD_INITIAL_SOURCE_DATA back to False - Restore NUM_RECORDS to 100M - Comment out multi-warehouse testing sections - Add reload-only mode for raw data restoration - Confirmed current table has only 1M records (lost 100M dataset) Co-Authored-By: AJ Steers <[email protected]>
…A toggle - Add clear warning that toggling RELOAD_INITIAL_SOURCE_DATA=True is destructive - Emphasizes data loss and multi-hour reload time - Prevents accidental dataset deletion Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
examples/run_fast_lake_copy.py (3)
197-197
: Fix typo in S3 bucket name?The bucket name has a typo: "ab-destiantion-iceberg-us-west-2" (should be "ab-destination-iceberg-us-west-2"). This will cause runtime errors when trying to access the bucket. Should we fix this typo? wdyt?
- print(" Using co-located bucket: ab-destiantion-iceberg-us-west-2 (us-west-2)") + print(" Using co-located bucket: ab-destination-iceberg-us-west-2 (us-west-2)")
201-201
: Fix typo in bucket_name parameter?Same typo here in the actual bucket name parameter that will be used for S3 operations.
- bucket_name="ab-destiantion-iceberg-us-west-2", + bucket_name="ab-destination-iceberg-us-west-2",
456-480
: Fix variable shadowing that corrupts metrics?The debug block reuses
total_load_records
which shadows the function-scoped variable used for the returned metrics (line 404). This overwrites the aggregated value and causes incorrect results. Should we rename the inner accumulator to avoid this collision? wdyt?- print(f" 🔍 Debug: Load File Analysis for {stream_name}:") - if result.file_manifest: - total_load_records = 0 + print(f" 🔍 Debug: Load File Analysis for {stream_name}:") + if result.file_manifest: + debug_total_records = 0 print(f" Files processed in load: {result.files_processed}") print(f" Record count per file breakdown:") for i, file_info in enumerate(result.file_manifest[:10]): # Show first 10 files file_name = file_info.get('file', 'unknown') rows_loaded = file_info.get('rows_loaded', 0) - total_load_records += rows_loaded + debug_total_records += rows_loaded print(f" Load File {i+1}: {file_name} -> {rows_loaded:,} records") if len(result.file_manifest) > 10: remaining_files = result.file_manifest[10:] remaining_records = sum(f.get('rows_loaded', 0) for f in remaining_files) - total_load_records += remaining_records + debug_total_records += remaining_records print(f" ... and {len(remaining_files)} more files -> {remaining_records:,} records") - print(f" Total records from file breakdown: {total_load_records:,}") + print(f" Total records from file breakdown: {debug_total_records:,}") print(f" FastLoadResult.actual_record_count: {result.actual_record_count:,}") - if total_load_records != result.actual_record_count: - print(f" ⚠️ MISMATCH: File breakdown ({total_load_records:,}) != actual_record_count ({result.actual_record_count:,})") + if debug_total_records != result.actual_record_count: + print(f" ⚠️ MISMATCH: File breakdown ({debug_total_records:,}) != actual_record_count ({result.actual_record_count:,})") else: print(f" ✅ File breakdown matches actual_record_count")
🧹 Nitpick comments (1)
examples/run_fast_lake_copy.py (1)
58-58
: Align comment with actual NUM_RECORDS value?The comment says "Restore to 100M for reload process" but NUM_RECORDS is already set to 100M. Since this appears to be the final value, should we update the comment to something like "Total records to process (100 million for large-scale test)" to match the pattern at line 44? wdyt?
-NUM_RECORDS: int = 100_000_000 # Restore to 100M for reload process +NUM_RECORDS: int = 100_000_000 # Total records to process (100 million for large-scale test)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
examples/run_fast_lake_copy.py
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
examples/*
⚙️ CodeRabbit Configuration File
These scripts are intended for demonstration purposes only. They are not meant to represent production code. For these scripts, you should prefer brevity over robust error handling and handling of edge cases. These are demos - which should be as simple as possible to handle the 'blessed' use cases.
Files:
examples/run_fast_lake_copy.py
🧬 Code Graph Analysis (1)
examples/run_fast_lake_copy.py (1)
airbyte/lakes.py (8)
FastLoadResult
(87-101)FastUnloadResult
(70-84)S3LakeStorage
(104-130)root_storage_path
(35-37)root_storage_uri
(30-32)root_storage_uri
(128-130)root_storage_uri
(150-152)get_stream_root_uri
(50-55)
🔇 Additional comments (2)
examples/run_fast_lake_copy.py (2)
167-178
: Nice implementation of the custom storage class!The CustomS3LakeStorage implementation with configurable path prefix is clean and follows good OOP practices. This allows for flexible warehouse-specific testing paths.
643-680
: Great use of commented-out multi-warehouse testing code!The commented-out section shows a well-structured approach for testing across multiple warehouse configurations. This scaffolding makes it easy to enable comprehensive performance testing when needed. The logic for iterating through warehouse configs and collecting results is clean and maintainable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🔭 Outside diff range comments (1)
airbyte/caches/base.py (1)
115-123
: Fix super() call forconfig_hash
– current form is incorrect
super(SqlConfig, self).config_hash
bypassesSqlConfig
in the MRO and will not find the property. Can we usesuper()
here, wdyt?@final @property def config_hash(self) -> str | None: """Return a hash of the cache configuration. This is the same as the SQLConfig hash from the superclass. """ - return super(SqlConfig, self).config_hash + return super().config_hash
♻️ Duplicate comments (6)
airbyte/lakes.py (2)
16-19
: Makeshort_name
an abstract property to enforce the contractRight now
short_name
is only type-annotated in the base but not enforced. Subclasses could forget to set it and you’d get a runtime error when callingget_artifact_prefix()
. Would you convert this into an abstract property so subclasses must implement it, wdyt?class LakeStorage(abc.ABC): """PyAirbyte LakeStorage class.""" - def __init__(self) -> None: - """Initialize LakeStorage base class.""" - self.short_name: str + def __init__(self) -> None: + """Initialize LakeStorage base class.""" + # Subclasses must implement `short_name` via the abstract property below. + pass + + @property + @abstractmethod + def short_name(self) -> str: + """Short, lowercase snake_case name of this lake backend (e.g., 's3', 'gcs').""" + raise NotImplementedError("Subclasses must implement this property.")
123-131
: Offer: optional storage integration + safer repr to avoid credential exposureTo support secure Snowflake stages and reduce the chance of secrets leaking into logs, would you add an optional
storage_integration_name
and a safe__repr__
, wdyt?def __init__( self, *, bucket_name: str, region: str, short_name: str = "s3", aws_access_key_id: str, aws_secret_access_key: str, + storage_integration_name: str | None = None, ) -> None: """Initialize S3LakeStorage with required parameters.""" self.bucket_name = bucket_name self.region = region self.aws_access_key_id = aws_access_key_id self.aws_secret_access_key = aws_secret_access_key - self.short_name = self._validate_short_name(short_name) + self.short_name = self._validate_short_name(short_name) + self.storage_integration_name = storage_integration_namePossible safe repr (outside the shown range):
def __repr__(self) -> str: return ( f"S3LakeStorage(bucket_name='{self.bucket_name}', region='{self.region}', " f"aws_access_key_id='{self.aws_access_key_id[:4]}***', " f"aws_secret_access_key='***REDACTED***', " f"short_name='{self.short_name}', " f"storage_integration_name='{self.storage_integration_name}')" )airbyte/caches/bigquery.py (1)
93-95
: Good: explicit type-check over hasattr()Using
isinstance(lake_store, GCSLakeStorage)
is clearer and safer than feature-detection viahasattr
. Thanks for adopting this pattern.airbyte/caches/snowflake.py (3)
117-121
: Good: explicit S3-only path via isinstanceUsing
isinstance(lake_store, S3LakeStorage)
is clear and future-proof compared to attribute probing.
297-311
: Aggregate COPY load metadata across all files, not just the first row
RESULT_SCAN
often returns multiple rows (per file). Summing across rows yields accurate totals. Would you switch to aggregations, wdyt?- first_row = file_manifest[0] if file_manifest else {} - record_count = first_row.get("rows_loaded") or first_row.get("rows_parsed") - total_data_size_bytes = first_row.get("input_bytes") - compressed_size_bytes = first_row.get("output_bytes") + record_count = sum( + int((r.get("rows_loaded") or r.get("rows_parsed") or 0) or 0) + for r in file_manifest + ) + total_data_size_bytes = sum(int(r.get("input_bytes") or 0) for r in file_manifest) + compressed_size_bytes = sum(int(r.get("output_bytes") or 0) for r in file_manifest)
135-144
: Avoid embedding AWS keys in SQL; prefer STORAGE INTEGRATION when availableEmbedding secrets in DDL risks exposure in query history. Could we prefer a
STORAGE_INTEGRATION
when provided and fall back to credentials otherwise, wdyt?- create_stage_sql = f""" - CREATE STAGE IF NOT EXISTS {qualified_prefix}.{stage_name} - URL = '{lake_store.root_storage_uri}' - CREDENTIALS = ( - AWS_KEY_ID = '{lake_store.aws_access_key_id}' - AWS_SECRET_KEY = '{lake_store.aws_secret_access_key}' - ) - FILE_FORMAT = {qualified_prefix}.{file_format_name} - """ + creds_clause = ( + f"STORAGE_INTEGRATION = {getattr(lake_store, 'storage_integration_name', None)}" + if getattr(lake_store, "storage_integration_name", None) + else f"""CREDENTIALS = ( + AWS_KEY_ID = '{lake_store.aws_access_key_id}' + AWS_SECRET_KEY = '{lake_store.aws_secret_access_key}' + )""" + ) + create_stage_sql = f""" + CREATE STAGE IF NOT EXISTS {qualified_prefix}.{stage_name} + URL = '{lake_store.root_storage_uri}' + {creds_clause} + FILE_FORMAT = {qualified_prefix}.{file_format_name} + """If you prefer, I can also add a simple identifier validation/quoting helper to guard stage/format names, wdyt?
🧹 Nitpick comments (6)
airbyte/lakes.py (1)
65-67
: Good: central artifact prefix helperHaving
get_artifact_prefix()
on the base lets caches reuse a single convention. In Snowflake, would you call this instead of re-deriving the prefix to avoid duplication, wdyt?airbyte/caches/bigquery.py (1)
101-108
: Prefer publicschema_name
over private chain to form table namesAccessing
self._read_processor.sql_config.schema_name
reaches into private internals. Can we useself.schema_name
instead, wdyt?- elif schema_name is not None: - qualified_table_name = f"{schema_name}.{table_name}" - else: - qualified_table_name = f"{self._read_processor.sql_config.schema_name}.{table_name}" + elif schema_name is not None: + qualified_table_name = f"{schema_name}.{table_name}" + else: + qualified_table_name = f"{self.schema_name}.{table_name}"airbyte/caches/snowflake.py (1)
99-107
: Reuse lake_store.get_artifact_prefix() instead of rebuilding the stringWe already have
LakeStorage.get_artifact_prefix()
. Would you call that to avoid duplication and keep conventions single-sourced, wdyt?def _get_lake_artifact_prefix(self, lake_store: LakeStorage) -> str: """Get the artifact prefix for this lake storage.""" - return f"AIRBYTE_LAKE_{lake_store.short_name.upper()}_" + return lake_store.get_artifact_prefix()airbyte/caches/base.py (3)
387-420
: Minor: resolve both placeholders in one pass to avoid double-formattingCurrent logic formats twice; it works, but a single
.format()
with both keys is simpler and avoids surprises with nested braces. Would you consider consolidating, wdyt?- if "{stream_name}" in lake_store_prefix: - if stream_name is not None: - lake_store_prefix = lake_store_prefix.format(stream_name=stream_name) - else: - raise ValueError( - "stream_name must be provided when lake_store_prefix contains {stream_name}." - ) - - if "{batch_id}" in lake_store_prefix: - batch_id = batch_id or generate_ulid() - lake_store_prefix = lake_store_prefix.format( - batch_id=batch_id, - ) + needs_stream = "{stream_name}" in lake_store_prefix + needs_batch = "{batch_id}" in lake_store_prefix + if needs_stream and stream_name is None: + raise ValueError("stream_name must be provided when lake_store_prefix contains {stream_name}.") + if needs_batch: + batch_id = batch_id or generate_ulid() + lake_store_prefix = lake_store_prefix.format( + stream_name=stream_name if needs_stream else "{stream_name}", + batch_id=batch_id if needs_batch else "{batch_id}", + )
522-528
: Docstring nit: this method loads, not unloadsThe docstring says “Unload the cache…” but this is the load path. Shall we update it, wdyt?
def fast_load_streams( self, lake_store: LakeStorage, *, lake_store_prefix: str, streams: list[str], zero_copy: bool = False, ) -> None: - """Unload the cache to a lake store. + """Load streams from a lake store into the cache.
554-556
: Genericize the zero-copy message (base class shouldn’t mention Snowflake)This is a base helper used across caches. Would you switch to a backend-agnostic message, wdyt?
- if zero_copy: - raise NotImplementedError("Zero-copy loading is not yet supported in Snowflake.") + if zero_copy: + raise NotImplementedError("Zero-copy loading is not yet supported for this cache.")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
airbyte/caches/base.py
(14 hunks)airbyte/caches/bigquery.py
(2 hunks)airbyte/caches/snowflake.py
(3 hunks)airbyte/lakes.py
(1 hunks)examples/run_fast_lake_copy.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- examples/run_fast_lake_copy.py
🧰 Additional context used
🧬 Code Graph Analysis (4)
airbyte/caches/bigquery.py (5)
airbyte/_processors/sql/bigquery.py (1)
BigQueryConfig
(37-97)airbyte/caches/base.py (3)
fast_unload_table
(485-511)_resolve_lake_store_path
(387-419)fast_load_table
(564-581)airbyte/lakes.py (8)
FastLoadResult
(99-117)FastUnloadResult
(78-96)GCSLakeStorage
(150-169)LakeStorage
(13-67)root_storage_uri
(30-32)root_storage_uri
(145-147)root_storage_uri
(167-169)get_stream_root_uri
(50-55)airbyte/caches/snowflake.py (2)
fast_unload_table
(147-232)fast_load_table
(235-320)airbyte/datasets/_sql.py (2)
stream_name
(81-82)to_sql_table
(197-199)
airbyte/caches/snowflake.py (7)
airbyte/lakes.py (7)
S3LakeStorage
(120-147)FastLoadResult
(99-117)FastUnloadResult
(78-96)LakeStorage
(13-67)root_storage_uri
(30-32)root_storage_uri
(145-147)root_storage_uri
(167-169)airbyte/_processors/sql/snowflake.py (2)
SnowflakeConfig
(36-175)SnowflakeSqlProcessor
(203-283)airbyte/caches/base.py (5)
CacheBase
(45-613)execute_sql
(125-147)fast_unload_table
(485-511)processor
(151-153)fast_load_table
(564-581)airbyte/destinations/_translate_cache_to_dest.py (1)
snowflake_cache_to_destination_configuration
(100-114)airbyte/shared/sql_processor.py (2)
RecordDedupeMode
(75-79)SqlProcessorBase
(159-1166)airbyte/caches/bigquery.py (2)
fast_unload_table
(71-125)fast_load_table
(128-159)airbyte/datasets/_sql.py (1)
stream_name
(81-82)
airbyte/lakes.py (1)
airbyte/_batch_handles.py (1)
record_count
(52-54)
airbyte/caches/base.py (9)
airbyte/_util/text_util.py (1)
generate_ulid
(9-11)airbyte/caches/_state_backend_base.py (1)
StateBackendBase
(20-66)airbyte/lakes.py (3)
FastLoadResult
(99-117)FastUnloadResult
(78-96)LakeStorage
(13-67)airbyte/shared/sql_processor.py (1)
SqlConfig
(86-156)airbyte/_writers/base.py (1)
name
(24-32)airbyte/datasets/_sql.py (2)
stream_name
(81-82)to_sql_table
(197-199)airbyte/caches/_catalog_backend.py (2)
stream_names
(78-80)stream_names
(248-249)airbyte/shared/catalog_providers.py (1)
stream_names
(75-77)airbyte/caches/bigquery.py (1)
fast_unload_table
(71-125)
🔇 Additional comments (1)
airbyte/caches/snowflake.py (1)
91-93
: Destination pairing name looks wrong for Snowflake
paired_destination_name = "destination-bigquery"
seems like a copy/paste from BigQuery. Should this be"destination-snowflake"
to matchDestinationSnowflake
, wdyt?- paired_destination_name: ClassVar[str | None] = "destination-bigquery" + paired_destination_name: ClassVar[str | None] = "destination-snowflake"
@override | ||
def fast_load_table( | ||
self, | ||
table_name: str, | ||
lake_store: LakeStorage, | ||
lake_store_prefix: str, | ||
*, | ||
db_name: str | None = None, | ||
schema_name: str | None = None, | ||
zero_copy: bool = False, | ||
) -> FastLoadResult: | ||
"""Load a single stream from the lake store using BigQuery LOAD DATA. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fast_load_table: undefined stream_name
, ignores lake_store_prefix
, and no return
This will raise at runtime (NameError: stream_name
). It also ignores the provided lake_store_prefix
and never returns a FastLoadResult
. Here’s a focused fix that also reuses the same path-resolution approach as unload:
@override
def fast_load_table(
self,
table_name: str,
lake_store: LakeStorage,
lake_store_prefix: str,
*,
db_name: str | None = None,
schema_name: str | None = None,
zero_copy: bool = False,
) -> FastLoadResult:
"""Load a single stream from the lake store using BigQuery LOAD DATA.
This implementation uses BigQuery's native LOAD DATA functionality
to read directly from GCS, bypassing the Arrow dataset limitation.
"""
- sql_table = self.streams[stream_name].to_sql_table()
- table_name = sql_table.name
-
- if not hasattr(lake_store, "bucket_name"):
- raise NotImplementedError("BigQuery load currently only supports GCS lake storage")
-
- source_uri = f"{lake_store.get_stream_root_uri(stream_name)}*.parquet"
+ if db_name is not None and schema_name is None:
+ raise ValueError("If db_name is provided, schema_name must also be provided.")
+
+ if not isinstance(lake_store, GCSLakeStorage):
+ raise NotImplementedError("BigQuery load currently only supports GCS lake storage")
+
+ resolved_lake_store_prefix = self._resolve_lake_store_path(
+ lake_store_prefix=lake_store_prefix,
+ # Allow {stream_name} interpolation even when loading by table:
+ stream_name=table_name,
+ )
+ source_uri = f"{lake_store.root_storage_uri}{resolved_lake_store_prefix}/*.parquet"
load_statement = f"""
- LOAD DATA INTO {self._read_processor.sql_config.schema_name}.{table_name}
+ LOAD DATA INTO {(f"{db_name}.{schema_name}" if db_name and schema_name else (f"{schema_name}" if schema_name else self.schema_name))}.{table_name}
FROM FILES (
format = 'PARQUET',
uris = ['{source_uri}']
)
"""
- self.execute_sql(load_statement)
+ self.execute_sql(load_statement)
+ return FastLoadResult(
+ table_name=table_name,
+ lake_store=lake_store,
+ lake_store_prefix=resolved_lake_store_prefix,
+ )
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@override | |
def fast_load_table( | |
self, | |
table_name: str, | |
lake_store: LakeStorage, | |
lake_store_prefix: str, | |
*, | |
db_name: str | None = None, | |
schema_name: str | None = None, | |
zero_copy: bool = False, | |
) -> FastLoadResult: | |
"""Load a single stream from the lake store using BigQuery LOAD DATA. | |
@override | |
def fast_load_table( | |
self, | |
table_name: str, | |
lake_store: LakeStorage, | |
lake_store_prefix: str, | |
*, | |
db_name: str | None = None, | |
schema_name: str | None = None, | |
zero_copy: bool = False, | |
) -> FastLoadResult: | |
"""Load a single stream from the lake store using BigQuery LOAD DATA. | |
This implementation uses BigQuery's native LOAD DATA functionality | |
to read directly from GCS, bypassing the Arrow dataset limitation. | |
""" | |
if db_name is not None and schema_name is None: | |
raise ValueError("If db_name is provided, schema_name must also be provided.") | |
if not isinstance(lake_store, GCSLakeStorage): | |
raise NotImplementedError("BigQuery load currently only supports GCS lake storage") | |
resolved_lake_store_prefix = self._resolve_lake_store_path( | |
lake_store_prefix=lake_store_prefix, | |
# Allow {stream_name} interpolation even when loading by table: | |
stream_name=table_name, | |
) | |
source_uri = f"{lake_store.root_storage_uri}{resolved_lake_store_prefix}/*.parquet" | |
load_statement = f""" | |
LOAD DATA INTO {(f"{db_name}.{schema_name}" if db_name and schema_name else (f"{schema_name}" if schema_name else self.schema_name))}.{table_name} | |
FROM FILES ( | |
format = 'PARQUET', | |
uris = ['{source_uri}'] | |
) | |
""" | |
self.execute_sql(load_statement) | |
return FastLoadResult( | |
table_name=table_name, | |
lake_store=lake_store, | |
lake_store_prefix=resolved_lake_store_prefix, | |
) |
🤖 Prompt for AI Agents
In airbyte/caches/bigquery.py around lines 127 to 138, the fast_load_table
implementation references an undefined variable stream_name, ignores the
lake_store_prefix when building the GCS path, and does not return a
FastLoadResult; change the code to accept or derive the stream name from
table_name (or pass it in), construct the GCS path using the same lake_store +
lake_store_prefix resolution logic used by unload (join prefix and stream/table
path safely), use that path in the BigQuery LOAD job configuration, and ensure
the function returns a FastLoadResult with the job id / status (or appropriate
metadata) after the load completes or is submitted.
unload_statement = f""" | ||
COPY INTO @{qualified_prefix}.{stage_name}/{lake_store_prefix}/ | ||
FROM {qualified_table_name} | ||
FILE_FORMAT = {qualified_prefix}.{file_format_name} | ||
OVERWRITE = TRUE | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Resolve lake_store_prefix
(supports {stream_name}/{batch_id}) before building SQL
As-is, placeholders won’t be interpolated and paths may contain literal braces. Would you resolve the prefix using the base helper, wdyt?
- unload_statement = f"""
+ resolved_lake_store_prefix = self._resolve_lake_store_path(
+ lake_store_prefix=lake_store_prefix,
+ stream_name=stream_name or table_name,
+ )
+
+ unload_statement = f"""
COPY INTO @{qualified_prefix}.{stage_name}/{lake_store_prefix}/
FROM {qualified_table_name}
FILE_FORMAT = {qualified_prefix}.{file_format_name}
OVERWRITE = TRUE
"""
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In airbyte/caches/snowflake.py around lines 192 to 198, lake_store_prefix may
contain placeholders like {stream_name}/{batch_id} which are not being resolved
before composing the COPY SQL; call the existing base helper to resolve/format
lake_store_prefix with the appropriate values (stream_name, batch_id or context)
into a concrete path, assign that resolved string to a variable, and use that
variable in the f-string that builds unload_statement so the SQL contains the
actual path instead of literal braces.
metadata_row = result_scan_result.fetchone() | ||
|
||
total_data_size_bytes = None | ||
compressed_size_bytes = None | ||
file_manifest = [] | ||
|
||
if metadata_row: | ||
row_dict = ( | ||
dict(metadata_row._mapping) # noqa: SLF001 | ||
if hasattr(metadata_row, "_mapping") | ||
else dict(metadata_row) | ||
) | ||
file_manifest.append(row_dict) | ||
|
||
record_count = row_dict.get("rows_unloaded") | ||
total_data_size_bytes = row_dict.get("input_bytes") | ||
compressed_size_bytes = row_dict.get("output_bytes") | ||
|
||
return FastUnloadResult( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initialize record_count
to avoid UnboundLocalError when no metadata rows
If RESULT_SCAN
returns no rows, record_count
is undefined at return. Can we initialize it to None
before the conditional, wdyt?
- metadata_row = result_scan_result.fetchone()
-
- total_data_size_bytes = None
- compressed_size_bytes = None
- file_manifest = []
+ metadata_row = result_scan_result.fetchone()
+
+ record_count = None
+ total_data_size_bytes = None
+ compressed_size_bytes = None
+ file_manifest = []
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
metadata_row = result_scan_result.fetchone() | |
total_data_size_bytes = None | |
compressed_size_bytes = None | |
file_manifest = [] | |
if metadata_row: | |
row_dict = ( | |
dict(metadata_row._mapping) # noqa: SLF001 | |
if hasattr(metadata_row, "_mapping") | |
else dict(metadata_row) | |
) | |
file_manifest.append(row_dict) | |
record_count = row_dict.get("rows_unloaded") | |
total_data_size_bytes = row_dict.get("input_bytes") | |
compressed_size_bytes = row_dict.get("output_bytes") | |
return FastUnloadResult( | |
metadata_row = result_scan_result.fetchone() | |
record_count = None | |
total_data_size_bytes = None | |
compressed_size_bytes = None | |
file_manifest = [] | |
if metadata_row: | |
row_dict = ( | |
dict(metadata_row._mapping) # noqa: SLF001 | |
if hasattr(metadata_row, "_mapping") | |
else dict(metadata_row) | |
) | |
file_manifest.append(row_dict) | |
record_count = row_dict.get("rows_unloaded") | |
total_data_size_bytes = row_dict.get("input_bytes") | |
compressed_size_bytes = row_dict.get("output_bytes") | |
return FastUnloadResult( |
🤖 Prompt for AI Agents
In airbyte/caches/snowflake.py around lines 205 to 223, record_count is only
assigned inside the metadata_row conditional so if RESULT_SCAN returns no rows
it remains undefined; initialize record_count = None alongside
total_data_size_bytes and compressed_size_bytes before the if block, and keep
assigning it from row_dict.get("rows_unloaded") inside the conditional so the
function can safely return when no metadata rows are present.
return FastUnloadResult( | ||
stream_name=stream_name, | ||
table_name=table_name, | ||
lake_store=lake_store, | ||
lake_store_prefix=lake_store_prefix, | ||
record_count=record_count, | ||
total_data_size_bytes=total_data_size_bytes, | ||
compressed_size_bytes=compressed_size_bytes, | ||
file_manifest=file_manifest, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Return resolved prefix to the caller
Once the prefix is resolved, the result should reflect it for round-tripping. Shall we return resolved_lake_store_prefix
, wdyt?
return FastUnloadResult(
stream_name=stream_name,
table_name=table_name,
lake_store=lake_store,
- lake_store_prefix=lake_store_prefix,
+ lake_store_prefix=resolved_lake_store_prefix,
record_count=record_count,
total_data_size_bytes=total_data_size_bytes,
compressed_size_bytes=compressed_size_bytes,
file_manifest=file_manifest,
)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
return FastUnloadResult( | |
stream_name=stream_name, | |
table_name=table_name, | |
lake_store=lake_store, | |
lake_store_prefix=lake_store_prefix, | |
record_count=record_count, | |
total_data_size_bytes=total_data_size_bytes, | |
compressed_size_bytes=compressed_size_bytes, | |
file_manifest=file_manifest, | |
return FastUnloadResult( | |
stream_name=stream_name, | |
table_name=table_name, | |
lake_store=lake_store, | |
lake_store_prefix=resolved_lake_store_prefix, | |
record_count=record_count, | |
total_data_size_bytes=total_data_size_bytes, | |
compressed_size_bytes=compressed_size_bytes, | |
file_manifest=file_manifest, | |
) |
🤖 Prompt for AI Agents
In airbyte/caches/snowflake.py around lines 223 to 231, the FastUnloadResult is
currently populated with the original lake_store_prefix instead of the resolved
prefix; update the returned FastUnloadResult to use resolved_lake_store_prefix
(the value computed earlier) for its lake_store_prefix field so callers receive
the resolved path for round-tripping.
load_statement = f""" | ||
COPY INTO {qualified_table_name} | ||
FROM @{qualified_prefix}.{stage_name}/{lake_store_prefix}/ | ||
FILE_FORMAT = {qualified_prefix}.{file_format_name} | ||
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE | ||
PURGE = FALSE | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Also resolve prefix on load path prior to COPY INTO
Same reasoning as unload—this ensures {stream_name}/{batch_id} are expanded. Use the resolved value in the SQL and in the returned result, wdyt?
- load_statement = f"""
+ resolved_lake_store_prefix = self._resolve_lake_store_path(
+ lake_store_prefix=lake_store_prefix,
+ stream_name=table_name,
+ )
+
+ load_statement = f"""
COPY INTO {qualified_table_name}
- FROM @{qualified_prefix}.{stage_name}/{lake_store_prefix}/
+ FROM @{qualified_prefix}.{stage_name}/{resolved_lake_store_prefix}/
FILE_FORMAT = {qualified_prefix}.{file_format_name}
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
PURGE = FALSE
"""
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
load_statement = f""" | |
COPY INTO {qualified_table_name} | |
FROM @{qualified_prefix}.{stage_name}/{lake_store_prefix}/ | |
FILE_FORMAT = {qualified_prefix}.{file_format_name} | |
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE | |
PURGE = FALSE | |
""" | |
resolved_lake_store_prefix = self._resolve_lake_store_path( | |
lake_store_prefix=lake_store_prefix, | |
stream_name=table_name, | |
) | |
load_statement = f""" | |
COPY INTO {qualified_table_name} | |
FROM @{qualified_prefix}.{stage_name}/{resolved_lake_store_prefix}/ | |
FILE_FORMAT = {qualified_prefix}.{file_format_name} | |
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE | |
PURGE = FALSE | |
""" |
🤖 Prompt for AI Agents
In airbyte/caches/snowflake.py around lines 278 to 284, resolve the prefix
variable (expanding {stream_name}/{batch_id}) before constructing the COPY INTO
SQL so the load path is fully expanded; then use that resolved prefix in the
f-string SQL (COPY INTO ... FROM
@{resolved_prefix}.{stage_name}/{lake_store_prefix}/ ...) and also return or
include the same resolved prefix in the function's result object so callers see
the exact load path used. Make sure to compute the resolved value once, reuse it
in both the SQL and the returned payload, and preserve existing
quoting/qualification logic.
return FastLoadResult( | ||
table_name=table_name, | ||
lake_store=lake_store, | ||
lake_store_prefix=lake_store_prefix, | ||
record_count=record_count, | ||
total_data_size_bytes=total_data_size_bytes, | ||
compressed_size_bytes=compressed_size_bytes, | ||
file_manifest=file_manifest, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Return the resolved prefix from load() as well
For symmetry and correctness in subsequent helpers, should we return the resolved prefix, wdyt?
return FastLoadResult(
table_name=table_name,
lake_store=lake_store,
- lake_store_prefix=lake_store_prefix,
+ lake_store_prefix=resolved_lake_store_prefix,
record_count=record_count,
total_data_size_bytes=total_data_size_bytes,
compressed_size_bytes=compressed_size_bytes,
file_manifest=file_manifest,
)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
return FastLoadResult( | |
table_name=table_name, | |
lake_store=lake_store, | |
lake_store_prefix=lake_store_prefix, | |
record_count=record_count, | |
total_data_size_bytes=total_data_size_bytes, | |
compressed_size_bytes=compressed_size_bytes, | |
file_manifest=file_manifest, | |
) | |
return FastLoadResult( | |
table_name=table_name, | |
lake_store=lake_store, | |
lake_store_prefix=resolved_lake_store_prefix, | |
record_count=record_count, | |
total_data_size_bytes=total_data_size_bytes, | |
compressed_size_bytes=compressed_size_bytes, | |
file_manifest=file_manifest, | |
) |
🤖 Prompt for AI Agents
In airbyte/caches/snowflake.py around lines 312 to 320, the load() function
currently returns FastLoadResult without including the resolved lake store
prefix; update the FastLoadResult construction to include the resolved prefix
(e.g., resolved_prefix or lake_store_prefix_resolved) so downstream helpers get
the final computed prefix, and then propagate this new field to all call sites
and associated type definitions/tests (update FastLoadResult dataclass/typing,
adjust callers to accept the new attribute, and update any unit tests or usages
that assert the returned structure).
Summary by CodeRabbit
New Features
Documentation
Chores